You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/27 00:27:43 UTC
svn commit: r1572343 [4/7] - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/binding/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/...
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Wed Feb 26 23:27:39 2014
@@ -1,150 +1,152 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.message.ServerMessage;
-
-public enum NotificationCheck
-{
-
- MESSAGE_COUNT_ALERT
- {
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
- {
- int msgCount;
- final long maximumMessageCount = queue.getMaximumMessageCount();
- if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount)
- {
- String notificationMsg = msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.";
-
- logNotification(this, queue, notificationMsg);
- listener.notifyClients(this, queue, notificationMsg);
- return true;
- }
- return false;
- }
- },
- MESSAGE_SIZE_ALERT(true)
- {
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
- {
- final long maximumMessageSize = queue.getMaximumMessageSize();
- if(maximumMessageSize != 0)
- {
- // Check for threshold message size
- long messageSize;
- messageSize = (msg == null) ? 0 : msg.getSize();
-
- if (messageSize >= maximumMessageSize)
- {
- String notificationMsg = messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]";
-
- logNotification(this, queue, notificationMsg);
- listener.notifyClients(this, queue, notificationMsg);
- return true;
- }
- }
- return false;
- }
-
- },
- QUEUE_DEPTH_ALERT
- {
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
- {
- // Check for threshold queue depth in bytes
- final long maximumQueueDepth = queue.getMaximumQueueDepth();
-
- if(maximumQueueDepth != 0)
- {
- final long queueDepth = queue.getQueueDepth();
-
- if (queueDepth >= maximumQueueDepth)
- {
- String notificationMsg = (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.";
-
- logNotification(this, queue, notificationMsg);
- listener.notifyClients(this, queue, notificationMsg);
- return true;
- }
- }
- return false;
- }
-
- },
- MESSAGE_AGE_ALERT
- {
- public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener)
- {
-
- final long maxMessageAge = queue.getMaximumMessageAge();
- if(maxMessageAge != 0)
- {
- final long currentTime = System.currentTimeMillis();
- final long thresholdTime = currentTime - maxMessageAge;
- final long firstArrivalTime = queue.getOldestMessageArrivalTime();
-
- if(firstArrivalTime < thresholdTime)
- {
- long oldestAge = currentTime - firstArrivalTime;
- String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
-
- logNotification(this, queue, notificationMsg);
- listener.notifyClients(this, queue, notificationMsg);
-
- return true;
- }
- }
- return false;
-
- }
-
- }
- ;
-
- private static final Logger LOGGER = Logger.getLogger(NotificationCheck.class);
-
- private final boolean _messageSpecific;
-
- NotificationCheck()
- {
- this(false);
- }
-
- NotificationCheck(boolean messageSpecific)
- {
- _messageSpecific = messageSpecific;
- }
-
- public boolean isMessageSpecific()
- {
- return _messageSpecific;
- }
-
- public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, AMQQueue.NotificationListener listener);
-
- //A bit of a hack, only for use until we do the logging listener
- private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg)
- {
- LOGGER.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.QueueNotificationListener;
+
+
+public enum NotificationCheck
+{
+
+ MESSAGE_COUNT_ALERT
+ {
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ int msgCount;
+ final long maximumMessageCount = queue.getAlertThresholdQueueDepthMessages();
+ if (maximumMessageCount!= 0 && (msgCount = queue.getQueueDepthMessages()) >= maximumMessageCount)
+ {
+ String notificationMsg = msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached.";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
+ return true;
+ }
+ return false;
+ }
+ },
+ MESSAGE_SIZE_ALERT(true)
+ {
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ final long maximumMessageSize = queue.getAlertThresholdMessageSize();
+ if(maximumMessageSize != 0)
+ {
+ // Check for threshold message size
+ long messageSize;
+ messageSize = (msg == null) ? 0 : msg.getSize();
+
+ if (messageSize >= maximumMessageSize)
+ {
+ String notificationMsg = messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ QUEUE_DEPTH_ALERT
+ {
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+ // Check for threshold queue depth in bytes
+ final long maximumQueueDepth = queue.getAlertThresholdQueueDepthBytes();
+
+ if(maximumQueueDepth != 0)
+ {
+ final long queueDepth = queue.getQueueDepthBytes();
+
+ if (queueDepth >= maximumQueueDepth)
+ {
+ String notificationMsg = (queueDepth>>10) + "Kb : Maximum queue depth threshold ("+(maximumQueueDepth>>10)+"Kb) breached.";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ },
+ MESSAGE_AGE_ALERT
+ {
+ public boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener)
+ {
+
+ final long maxMessageAge = queue.getAlertThresholdMessageAge();
+ if(maxMessageAge != 0)
+ {
+ final long currentTime = System.currentTimeMillis();
+ final long thresholdTime = currentTime - maxMessageAge;
+ final long firstArrivalTime = queue.getOldestMessageArrivalTime();
+
+ if(firstArrivalTime < thresholdTime)
+ {
+ long oldestAge = currentTime - firstArrivalTime;
+ String notificationMsg = (oldestAge/1000) + "s : Maximum age on queue threshold ("+(maxMessageAge /1000)+"s) breached.";
+
+ logNotification(this, queue, notificationMsg);
+ listener.notifyClients(this, queue, notificationMsg);
+
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ }
+ ;
+
+ private static final Logger LOGGER = Logger.getLogger(NotificationCheck.class);
+
+ private final boolean _messageSpecific;
+
+ NotificationCheck()
+ {
+ this(false);
+ }
+
+ NotificationCheck(boolean messageSpecific)
+ {
+ _messageSpecific = messageSpecific;
+ }
+
+ public boolean isMessageSpecific()
+ {
+ return _messageSpecific;
+ }
+
+ public abstract boolean notifyIfNecessary(ServerMessage<?> msg, AMQQueue queue, QueueNotificationListener listener);
+
+ //A bit of a hack, only for use until we do the logging listener
+ private static void logNotification(NotificationCheck notification, AMQQueue queue, String notificationMsg)
+ {
+ LOGGER.info(notification.name() + " On Queue " + queue.getName() + " - " + notificationMsg);
+ }
+}
Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java Wed Feb 26 23:27:39 2014
@@ -24,43 +24,45 @@ import org.apache.qpid.server.message.Se
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public abstract class OrderedQueueEntry<E extends OrderedQueueEntry<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> extends QueueEntryImpl<E,Q,L>
+public abstract class OrderedQueueEntry extends QueueEntryImpl
{
static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
_nextUpdater =
AtomicReferenceFieldUpdater.newUpdater
(OrderedQueueEntry.class, OrderedQueueEntry.class, "_next");
- private volatile E _next;
+ private volatile OrderedQueueEntry _next;
- public OrderedQueueEntry(L queueEntryList)
+ public OrderedQueueEntry(OrderedQueueEntryList queueEntryList)
{
super(queueEntryList);
}
- public OrderedQueueEntry(L queueEntryList, ServerMessage message, final long entryId)
+ public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message, final long entryId)
{
super(queueEntryList, message, entryId);
}
- public OrderedQueueEntry(L queueEntryList, ServerMessage message)
+ public OrderedQueueEntry(OrderedQueueEntryList queueEntryList, ServerMessage message)
{
super(queueEntryList, message);
}
- public E getNextNode()
+ @Override
+ public OrderedQueueEntry getNextNode()
{
return _next;
}
- public E getNextValidEntry()
+ @Override
+ public OrderedQueueEntry getNextValidEntry()
{
- E next = getNextNode();
+ OrderedQueueEntry next = getNextNode();
while(next != null && next.isDeleted())
{
- final E newNext = next.getNextNode();
+ final OrderedQueueEntry newNext = next.getNextNode();
if(newNext != null)
{
OrderedQueueEntryList._nextUpdater.compareAndSet(this,next, newNext);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -26,13 +26,12 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public abstract class OrderedQueueEntryList<E extends OrderedQueueEntry<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements
- QueueEntryListBase<E,Q,L>
+public abstract class OrderedQueueEntryList implements QueueEntryList
{
- private final E _head;
+ private final OrderedQueueEntry _head;
- private volatile E _tail;
+ private volatile OrderedQueueEntry _tail;
static final AtomicReferenceFieldUpdater<OrderedQueueEntryList, OrderedQueueEntry>
_tailUpdater =
@@ -40,27 +39,27 @@ public abstract class OrderedQueueEntryL
(OrderedQueueEntryList.class, OrderedQueueEntry.class, "_tail");
- private final Q _queue;
+ private final AMQQueue _queue;
static final AtomicReferenceFieldUpdater<OrderedQueueEntry, OrderedQueueEntry>
_nextUpdater = OrderedQueueEntry._nextUpdater;
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount = Integer.getInteger("qpid.queue.scavenge_count", 50);
- private final AtomicReference<E> _unscavengedHWM = new AtomicReference<E>();
+ private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
- public OrderedQueueEntryList(Q queue, HeadCreator<E,Q,L> headCreator)
+ public OrderedQueueEntryList(AMQQueue queue, HeadCreator headCreator)
{
_queue = queue;
- _head = headCreator.createHead((L)this);
+ _head = headCreator.createHead(this);
_tail = _head;
}
void scavenge()
{
- E hwm = _unscavengedHWM.getAndSet(null);
- E next = _head.getNextValidEntry();
+ QueueEntry hwm = _unscavengedHWM.getAndSet(null);
+ QueueEntry next = _head.getNextValidEntry();
if(hwm != null)
{
@@ -72,15 +71,15 @@ public abstract class OrderedQueueEntryL
}
- public Q getQueue()
+ public AMQQueue getQueue()
{
return _queue;
}
- public E add(ServerMessage message)
+ public QueueEntry add(ServerMessage message)
{
- E node = createQueueEntry(message);
+ OrderedQueueEntry node = createQueueEntry(message);
for (;;)
{
OrderedQueueEntry tail = _tail;
@@ -105,23 +104,24 @@ public abstract class OrderedQueueEntryL
}
}
- abstract protected E createQueueEntry(ServerMessage<?> message);
+ abstract protected OrderedQueueEntry createQueueEntry(ServerMessage<?> message);
- public E next(E node)
+ @Override
+ public QueueEntry next(QueueEntry node)
{
return node.getNextValidEntry();
}
- public static interface HeadCreator<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+ public static interface HeadCreator
{
- E createHead(L list);
+ OrderedQueueEntry createHead(QueueEntryList list);
}
- public static class QueueEntryIteratorImpl<E extends OrderedQueueEntry<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends OrderedQueueEntryList<E,Q,L>> implements QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>>
+ public static class QueueEntryIteratorImpl implements QueueEntryIterator
{
- private E _lastNode;
+ private QueueEntry _lastNode;
- QueueEntryIteratorImpl(E startNode)
+ QueueEntryIteratorImpl(QueueEntry startNode)
{
_lastNode = startNode;
}
@@ -131,14 +131,14 @@ public abstract class OrderedQueueEntryL
return _lastNode.getNextValidEntry() == null;
}
- public E getNode()
+ public QueueEntry getNode()
{
return _lastNode;
}
public boolean advance()
{
- E nextValidNode = _lastNode.getNextValidEntry();
+ QueueEntry nextValidNode = _lastNode.getNextValidEntry();
if(nextValidNode != null)
{
@@ -149,26 +149,26 @@ public abstract class OrderedQueueEntryL
}
}
- public QueueEntryIterator<E,Q,L,QueueConsumer<?,E,Q,L>> iterator()
+ public QueueEntryIterator iterator()
{
- return new QueueEntryIteratorImpl<E,Q,L>(_head);
+ return new QueueEntryIteratorImpl(_head);
}
- public E getHead()
+ public QueueEntry getHead()
{
return _head;
}
- public void entryDeleted(E queueEntry)
+ public void entryDeleted(QueueEntry queueEntry)
{
- E next = _head.getNextNode();
- E newNext = _head.getNextValidEntry();
+ QueueEntry next = _head.getNextNode();
+ QueueEntry newNext = _head.getNextValidEntry();
// the head of the queue has not been deleted, hence the deletion must have been mid queue.
if (next == newNext)
{
- E unscavengedHWM = _unscavengedHWM.get();
+ QueueEntry unscavengedHWM = _unscavengedHWM.get();
while(unscavengedHWM == null || unscavengedHWM.compareTo(queueEntry)<0)
{
_unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
@@ -182,7 +182,7 @@ public abstract class OrderedQueueEntryL
}
else
{
- E unscavengedHWM = _unscavengedHWM.get();
+ QueueEntry unscavengedHWM = _unscavengedHWM.get();
if(unscavengedHWM != null && (next == null || unscavengedHWM.compareTo(next) < 0))
{
_unscavengedHWM.compareAndSet(unscavengedHWM, null);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Wed Feb 26 23:27:39 2014
@@ -24,30 +24,30 @@ import org.apache.qpid.server.virtualhos
import java.util.Map;
-public abstract class OutOfOrderQueue<E extends QueueEntryImpl<E,Q,L>, Q extends OutOfOrderQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> extends AbstractQueue<E,Q,L>
+public abstract class OutOfOrderQueue extends AbstractQueue
{
protected OutOfOrderQueue(VirtualHost virtualHost,
Map<String, Object> attributes,
- QueueEntryListFactory<E, Q, L> entryListFactory)
+ QueueEntryListFactory entryListFactory)
{
super(virtualHost, attributes, entryListFactory);
}
@Override
- protected void checkConsumersNotAheadOfDelivery(final E entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
// check that all consumers are not in advance of the entry
- QueueConsumerList.ConsumerNodeIterator<E,Q,L> subIter = getConsumerList().iterator();
+ QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator();
while(subIter.advance() && !entry.isAcquired())
{
- final QueueConsumer<?,E,Q,L> consumer = subIter.getNode().getConsumer();
+ final QueueConsumer<?> consumer = subIter.getNode().getConsumer();
if(!consumer.isClosed())
{
- QueueContext<E,Q,L> context = consumer.getQueueContext();
+ QueueContext context = consumer.getQueueContext();
if(context != null)
{
- E released = context.getReleasedEntry();
+ QueueEntry released = context.getReleasedEntry();
while(!entry.isAcquired() && (released == null || released.compareTo(entry) > 0))
{
if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java Wed Feb 26 23:27:39 2014
@@ -27,7 +27,7 @@ import org.apache.qpid.server.virtualhos
import java.util.Map;
-public class PriorityQueue extends OutOfOrderQueue<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+public class PriorityQueue extends OutOfOrderQueue
{
public static final int DEFAULT_PRIORITY_LEVELS = 10;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Wed Feb 26 23:27:39 2014
@@ -22,23 +22,23 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-abstract public class PriorityQueueList extends OrderedQueueEntryList<PriorityQueueList.PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+abstract public class PriorityQueueList extends OrderedQueueEntryList
{
public PriorityQueueList(final PriorityQueue queue,
- final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> headCreator)
+ final HeadCreator headCreator)
{
super(queue, headCreator);
}
static class PriorityQueueMasterList extends PriorityQueueList
{
- private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> DUMMY_HEAD_CREATOR =
- new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+ private static final HeadCreator DUMMY_HEAD_CREATOR =
+ new HeadCreator()
{
@Override
- public PriorityQueueEntry createHead(final PriorityQueueList list)
+ public PriorityQueueEntry createHead(final QueueEntryList list)
{
return null;
}
@@ -66,11 +66,13 @@ abstract public class PriorityQueueList
return _priorities;
}
+ @Override
public PriorityQueue getQueue()
{
return _queue;
}
+ @Override
public PriorityQueueEntry add(ServerMessage message)
{
int index = message.getMessageHeader().getPriority() - _priorityOffset;
@@ -82,7 +84,7 @@ abstract public class PriorityQueueList
{
index = 0;
}
- return _priorityLists[index].add(message);
+ return (PriorityQueueEntry) _priorityLists[index].add(message);
}
@@ -92,13 +94,14 @@ abstract public class PriorityQueueList
throw new UnsupportedOperationException();
}
- public PriorityQueueEntry next(PriorityQueueEntry node)
+ @Override
+ public PriorityQueueEntry next(QueueEntry node)
{
- PriorityQueueEntry next = node.getNextValidEntry();
+ PriorityQueueEntry next = (PriorityQueueEntry) node.getNextValidEntry();
if(next == null)
{
- final PriorityQueueList nodeEntryList = node.getQueueEntryList();
+ final PriorityQueueList nodeEntryList = (PriorityQueueList) ((PriorityQueueEntry)node).getQueueEntryList();
int index;
for(index = _priorityLists.length-1; _priorityLists[index] != nodeEntryList; index--)
{
@@ -108,16 +111,16 @@ abstract public class PriorityQueueList
while(next == null && index != 0)
{
index--;
- next = _priorityLists[index].getHead().getNextValidEntry();
+ next = (PriorityQueueEntry) _priorityLists[index].getHead().getNextValidEntry();
}
}
return next;
}
- private final class PriorityQueueEntryListIterator implements QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList, QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>
+ private final class PriorityQueueEntryListIterator implements QueueEntryIterator
{
- private final QueueEntryIterator<PriorityQueueEntry, PriorityQueue, PriorityQueueList,QueueConsumer<?,PriorityQueueEntry, PriorityQueue, PriorityQueueList>>[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
+ private final QueueEntryIterator[] _iterators = new QueueEntryIterator[ _priorityLists.length ];
private PriorityQueueEntry _lastNode;
PriorityQueueEntryListIterator()
@@ -126,10 +129,10 @@ abstract public class PriorityQueueList
{
_iterators[i] = _priorityLists[i].iterator();
}
- _lastNode = _iterators[_iterators.length - 1].getNode();
+ _lastNode = (PriorityQueueEntry) _iterators[_iterators.length - 1].getNode();
}
-
+ @Override
public boolean atTail()
{
for(int i = 0; i < _iterators.length; i++)
@@ -142,18 +145,20 @@ abstract public class PriorityQueueList
return true;
}
+ @Override
public PriorityQueueEntry getNode()
{
return _lastNode;
}
+ @Override
public boolean advance()
{
for(int i = _iterators.length-1; i >= 0; i--)
{
if(_iterators[i].advance())
{
- _lastNode = _iterators[i].getNode();
+ _lastNode = (PriorityQueueEntry) _iterators[i].getNode();
return true;
}
}
@@ -161,23 +166,26 @@ abstract public class PriorityQueueList
}
}
+ @Override
public PriorityQueueEntryListIterator iterator()
{
return new PriorityQueueEntryListIterator();
}
+ @Override
public PriorityQueueEntry getHead()
{
- return _priorityLists[_priorities-1].getHead();
+ return (PriorityQueueEntry) _priorityLists[_priorities-1].getHead();
}
- public void entryDeleted(final PriorityQueueEntry queueEntry)
+ @Override
+ public void entryDeleted(final QueueEntry queueEntry)
{
}
}
- static class Factory implements QueueEntryListFactory<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+ static class Factory implements QueueEntryListFactory
{
private final int _priorities;
@@ -186,20 +194,20 @@ abstract public class PriorityQueueList
_priorities = priorities;
}
- public PriorityQueueList createQueueEntryList(PriorityQueue queue)
+ public PriorityQueueList createQueueEntryList(AMQQueue<?> queue)
{
- return new PriorityQueueMasterList(queue, _priorities);
+ return new PriorityQueueMasterList((PriorityQueue) queue, _priorities);
}
}
static class PriorityQueueEntrySubList extends PriorityQueueList
{
- private static final HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList> HEAD_CREATOR = new HeadCreator<PriorityQueueEntry, PriorityQueue, PriorityQueueList>()
+ private static final HeadCreator HEAD_CREATOR = new HeadCreator()
{
@Override
- public PriorityQueueEntry createHead(final PriorityQueueList list)
+ public PriorityQueueEntry createHead(final QueueEntryList list)
{
- return new PriorityQueueEntry(list);
+ return new PriorityQueueEntry((PriorityQueueList) list);
}
};
private int _listPriority;
@@ -222,7 +230,7 @@ abstract public class PriorityQueueList
}
}
- static class PriorityQueueEntry extends OrderedQueueEntry<PriorityQueueEntry, PriorityQueue, PriorityQueueList>
+ static class PriorityQueueEntry extends OrderedQueueEntry
{
private PriorityQueueEntry(final PriorityQueueList queueEntryList)
{
@@ -235,9 +243,10 @@ abstract public class PriorityQueueList
}
@Override
- public int compareTo(final PriorityQueueEntry o)
+ public int compareTo(final QueueEntry o)
{
- PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)o.getQueueEntryList();
+ PriorityQueueEntry other = (PriorityQueueEntry)o;
+ PriorityQueueEntrySubList pqel = (PriorityQueueEntrySubList)other.getQueueEntryList();
int otherPriority = pqel.getListPriority();
int thisPriority = ((PriorityQueueEntrySubList) getQueueEntryList()).getListPriority();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java Wed Feb 26 23:27:39 2014
@@ -24,37 +24,30 @@ import org.apache.qpid.server.consumer.C
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.MessageInstance;
-interface QueueConsumer<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> extends Consumer
+public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer, org.apache.qpid.server.model.Consumer<X>
{
void flushBatched();
void queueEmpty();
- boolean hasInterest(E node);
+ boolean hasInterest(QueueEntry node);
- boolean wouldSuspend(E entry);
+ boolean wouldSuspend(QueueEntry entry);
- void restoreCredit(E entry);
+ void restoreCredit(QueueEntry entry);
- void send(E entry, boolean batch);
+ void send(QueueEntry entry, boolean batch);
void queueDeleted();
SubFlushRunner getRunner();
- Q getQueue();
+ AMQQueue getQueue();
- boolean resend(E e);
+ boolean resend(QueueEntry e);
- public static enum State
- {
- ACTIVE,
- SUSPENDED,
- CLOSED
- }
+ MessageInstance.ConsumerAcquiredState<X> getOwningState();
- MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState();
-
- QueueContext<E,Q,L> getQueueContext();
+ QueueContext getQueueContext();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Wed Feb 26 23:27:39 2014
@@ -31,14 +31,23 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.util.StateChangeListener;
+import java.security.AccessControlException;
import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -46,17 +55,19 @@ import java.util.concurrent.locks.Reentr
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-class QueueConsumerImpl<T extends ConsumerTarget, E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements QueueConsumer<T,E,Q,L>
+class QueueConsumerImpl
+ extends AbstractConfiguredObject<QueueConsumerImpl>
+ implements QueueConsumer<QueueConsumerImpl>
{
private static final Logger _logger = Logger.getLogger(QueueConsumerImpl.class);
private final AtomicBoolean _targetClosed = new AtomicBoolean(false);
private final AtomicBoolean _closed = new AtomicBoolean(false);
- private final long _id;
+ private final long _consumerNumber;
private final Lock _stateChangeLock = new ReentrantLock();
private final long _createTime = System.currentTimeMillis();
- private final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>>(this);
+ private final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> _owningState = new MessageInstance.ConsumerAcquiredState<QueueConsumerImpl>(this);
private final boolean _acquires;
private final boolean _seesRequeues;
private final String _consumerName;
@@ -66,8 +77,9 @@ class QueueConsumerImpl<T extends Consum
private final FilterManager _filters;
private final Class<? extends ServerMessage> _messageClass;
private final Object _sessionReference;
- private final Q _queue;
- private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ private final AbstractQueue _queue;
+ private final boolean _exclusive;
+ private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
+ "(UNKNOWN)"
+ "] ");
@@ -77,14 +89,14 @@ class QueueConsumerImpl<T extends Consum
static
{
STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE);
- STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED);
- STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED);
+ STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.QUIESCED);
+ STATE_MAP.put(ConsumerTarget.State.CLOSED, State.DELETED);
}
- private final T _target;
+ private final ConsumerTarget _target;
private final SubFlushRunner _runner = new SubFlushRunner(this);
- private volatile QueueContext<E,Q,L> _queueContext;
- private StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> _stateListener = new StateChangeListener<QueueConsumerImpl<T,E,Q,L>, State>()
+ private volatile QueueContext _queueContext;
+ private StateChangeListener<? super QueueConsumerImpl, State> _stateListener = new StateChangeListener<QueueConsumerImpl, State>()
{
public void stateChanged(QueueConsumerImpl sub, State oldState, State newState)
{
@@ -93,16 +105,18 @@ class QueueConsumerImpl<T extends Consum
};
private final boolean _noLocal;
- QueueConsumerImpl(final Q queue,
- T target, final String consumerName,
+ QueueConsumerImpl(final AbstractQueue queue,
+ ConsumerTarget target, final String consumerName,
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
EnumSet<Option> optionSet)
{
-
+ super(UUID.randomUUID(), Collections.<String,Object>emptyMap(),
+ Collections.<String,Object>emptyMap(),
+ queue.getVirtualHost().getTaskExecutor());
_messageClass = messageClass;
_sessionReference = target.getSessionModel().getConnectionReference();
- _id = SUB_ID_GENERATOR.getAndIncrement();
+ _consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
_filters = filters;
_acquires = optionSet.contains(Option.ACQUIRES);
_seesRequeues = optionSet.contains(Option.SEES_REQUEUES);
@@ -111,6 +125,7 @@ class QueueConsumerImpl<T extends Consum
_target = target;
_queue = queue;
_noLocal = optionSet.contains(Option.NO_LOCAL);
+ _exclusive = optionSet.contains(Option.EXCLUSIVE);
setupLogging(optionSet.contains(Option.EXCLUSIVE));
// Access control
@@ -151,14 +166,14 @@ class QueueConsumerImpl<T extends Consum
{
close();
}
- final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> stateListener = getStateListener();
+ final StateChangeListener<? super QueueConsumerImpl, State> stateListener = getStateListener();
if(stateListener != null)
{
stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState));
}
}
- public T getTarget()
+ public ConsumerTarget getTarget()
{
return _target;
}
@@ -166,7 +181,7 @@ class QueueConsumerImpl<T extends Consum
@Override
public void externalStateChange()
{
- getQueue().deliverAsync(this);
+ _queue.deliverAsync(this);
}
@Override
@@ -229,12 +244,12 @@ class QueueConsumerImpl<T extends Consum
_target.queueDeleted();
}
- public boolean wouldSuspend(final E msg)
+ public boolean wouldSuspend(final QueueEntry msg)
{
return !_target.allocateCredit(msg.getMessage());
}
- public void restoreCredit(final E queueEntry)
+ public void restoreCredit(final QueueEntry queueEntry)
{
_target.restoreCredit(queueEntry.getMessage());
}
@@ -244,12 +259,12 @@ class QueueConsumerImpl<T extends Consum
_target.queueEmpty();
}
- State getState()
+ public State getState()
{
return STATE_MAP.get(_target.getState());
}
- public final Q getQueue()
+ public final AMQQueue getQueue()
{
return _queue;
}
@@ -258,7 +273,7 @@ class QueueConsumerImpl<T extends Consum
{
String queueString = new QueueLogSubject(_queue).toLogString();
- _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId())
+ _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getConsumerNumber())
+ "("
// queueString is [vh(/{0})/qu({1}) ] so need to trim
// ^ ^^
@@ -289,10 +304,10 @@ class QueueConsumerImpl<T extends Consum
@Override
public final void flush()
{
- getQueue().flushConsumer(this);
+ _queue.flushConsumer(this);
}
- public boolean resend(final E entry)
+ public boolean resend(final QueueEntry entry)
{
return getQueue().resend(entry, this);
}
@@ -302,27 +317,27 @@ class QueueConsumerImpl<T extends Consum
return _runner;
}
- public final long getId()
+ public final long getConsumerNumber()
{
- return _id;
+ return _consumerNumber;
}
- public final StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> getStateListener()
+ public final StateChangeListener<? super QueueConsumerImpl, State> getStateListener()
{
return _stateListener;
}
- public final void setStateListener(StateChangeListener<? super QueueConsumerImpl<T,E,Q,L>, State> listener)
+ public final void setStateListener(StateChangeListener<? super QueueConsumerImpl, State> listener)
{
_stateListener = listener;
}
- public final QueueContext<E,Q,L> getQueueContext()
+ public final QueueContext getQueueContext()
{
return _queueContext;
}
- final void setQueueContext(QueueContext<E,Q,L> queueContext)
+ final void setQueueContext(QueueContext queueContext)
{
_queueContext = queueContext;
}
@@ -334,10 +349,10 @@ class QueueConsumerImpl<T extends Consum
public final boolean isClosed()
{
- return getState() == State.CLOSED;
+ return getState() == State.DELETED;
}
- public final boolean hasInterest(E entry)
+ public final boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
if (entry.isRejectedBy(this))
@@ -412,7 +427,7 @@ class QueueConsumerImpl<T extends Consum
return _createTime;
}
- public final MessageInstance.ConsumerAcquiredState<QueueConsumer<T,E,Q,L>> getOwningState()
+ public final MessageInstance.ConsumerAcquiredState<QueueConsumerImpl> getOwningState()
{
return _owningState;
}
@@ -447,11 +462,126 @@ class QueueConsumerImpl<T extends Consum
return _deliveredCount.longValue();
}
- public final void send(final E entry, final boolean batch)
+ public final void send(final QueueEntry entry, final boolean batch)
{
_deliveredCount.incrementAndGet();
ServerMessage message = entry.getMessage();
_deliveredBytes.addAndGet(message.getSize());
_target.send(entry, batch);
}
+
+ @Override
+ protected boolean setState(final State currentState, final State desiredState)
+ {
+ return false;
+ }
+
+ @Override
+ public String getDistributionMode()
+ {
+ return acquires() ? "MOVE" : "COPY";
+ }
+
+ @Override
+ public String getSettlementMode()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isExclusive()
+ {
+ return _exclusive;
+ }
+
+ @Override
+ public boolean isNoLocal()
+ {
+ return isNoLocal();
+ }
+
+ @Override
+ public String getSelector()
+ {
+ return null;
+ }
+
+ @Override
+ public String setName(final String currentName, final String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
+
+ @Override
+ public void setDurable(final boolean durable)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return LifetimePolicy.DELETE_ON_SESSION_END;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+ {
+ return Collections.<C>emptyList();
+ }
+
+ @Override
+ public Object getAttribute(final String name)
+ {
+ if(ID.equals(name))
+ {
+ return getId();
+ }
+ else if(NAME.equals(name))
+ {
+ return getName();
+ }
+ else if(DURABLE.equals(name))
+ {
+ return isDurable();
+ }
+ else if(DISTRIBUTION_MODE.equals(name))
+ {
+ return getDistributionMode();
+ }
+ else if(SETTLEMENT_MODE.equals(name))
+ {
+ return getSettlementMode();
+ }
+ else if(LIFETIME_POLICY.equals(name))
+ {
+ return getLifetimePolicy();
+ }
+ else if(EXCLUSIVE.equals(name))
+ {
+ return isExclusive();
+ }
+ return super.getAttribute(name);
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return getAttributeNames(getClass());
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java Wed Feb 26 23:27:39 2014
@@ -24,19 +24,19 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-class QueueConsumerList<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E, Q,L>, L extends QueueEntryListBase<E,Q,L>>
+class QueueConsumerList
{
- private final ConsumerNode<E,Q,L> _head = new ConsumerNode<E,Q,L>();
+ private final ConsumerNode _head = new ConsumerNode();
- private final AtomicReference<ConsumerNode<E,Q,L>> _tail = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
- private final AtomicReference<ConsumerNode<E,Q,L>> _subNodeMarker = new AtomicReference<ConsumerNode<E,Q,L>>(_head);
+ private final AtomicReference<ConsumerNode> _tail = new AtomicReference<ConsumerNode>(_head);
+ private final AtomicReference<ConsumerNode> _subNodeMarker = new AtomicReference<ConsumerNode>(_head);
private final AtomicInteger _size = new AtomicInteger();
- public static final class ConsumerNode<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E, Q,L>, L extends QueueEntryListBase<E,Q,L>>
+ public static final class ConsumerNode
{
private final AtomicBoolean _deleted = new AtomicBoolean();
- private final AtomicReference<ConsumerNode<E,Q,L>> _next = new AtomicReference<ConsumerNode<E,Q,L>>();
- private final QueueConsumer<?,E,Q,L> _sub;
+ private final AtomicReference<ConsumerNode> _next = new AtomicReference<ConsumerNode>();
+ private final QueueConsumer<?> _sub;
public ConsumerNode()
{
@@ -45,7 +45,7 @@ class QueueConsumerList<E extends QueueE
_deleted.set(true);
}
- public ConsumerNode(final QueueConsumer<?,E,Q,L> sub)
+ public ConsumerNode(final QueueConsumer<?> sub)
{
//used for regular node construction
_sub = sub;
@@ -57,12 +57,12 @@ class QueueConsumerList<E extends QueueE
*
* @return the next non-deleted node, or null if none was found.
*/
- public ConsumerNode<E,Q,L> findNext()
+ public ConsumerNode findNext()
{
- ConsumerNode<E,Q,L> next = nextNode();
+ ConsumerNode next = nextNode();
while(next != null && next.isDeleted())
{
- final ConsumerNode<E,Q,L> newNext = next.nextNode();
+ final ConsumerNode newNext = next.nextNode();
if(newNext != null)
{
//try to move our _next reference forward to the 'newNext'
@@ -86,7 +86,7 @@ class QueueConsumerList<E extends QueueE
*
* @return the immediately next node in the structure, or null if at the tail.
*/
- protected ConsumerNode<E,Q,L> nextNode()
+ protected ConsumerNode nextNode()
{
return _next.get();
}
@@ -97,7 +97,7 @@ class QueueConsumerList<E extends QueueE
* @param node the ConsumerNode to set as 'next'
* @return whether the operation succeeded
*/
- private boolean setNext(final ConsumerNode<E,Q,L> node)
+ private boolean setNext(final ConsumerNode node)
{
return _next.compareAndSet(null, node);
}
@@ -112,18 +112,18 @@ class QueueConsumerList<E extends QueueE
return _deleted.compareAndSet(false,true);
}
- public QueueConsumer<?,E,Q,L> getConsumer()
+ public QueueConsumer<?> getConsumer()
{
return _sub;
}
}
- private void insert(final ConsumerNode<E,Q,L> node, final boolean count)
+ private void insert(final ConsumerNode node, final boolean count)
{
for (;;)
{
- ConsumerNode<E,Q,L> tail = _tail.get();
- ConsumerNode<E,Q,L> next = tail.nextNode();
+ ConsumerNode tail = _tail.get();
+ ConsumerNode next = tail.nextNode();
if (tail == _tail.get())
{
if (next == null)
@@ -146,16 +146,16 @@ class QueueConsumerList<E extends QueueE
}
}
- public void add(final QueueConsumer<?,E,Q,L> sub)
+ public void add(final QueueConsumer<?> sub)
{
- ConsumerNode<E,Q,L> node = new ConsumerNode<E,Q,L>(sub);
+ ConsumerNode node = new ConsumerNode(sub);
insert(node, true);
}
- public boolean remove(final QueueConsumer<?, E,Q,L> sub)
+ public boolean remove(final QueueConsumer<?> sub)
{
- ConsumerNode<E,Q,L> prevNode = _head;
- ConsumerNode<E,Q,L> node = _head.nextNode();
+ ConsumerNode prevNode = _head;
+ ConsumerNode node = _head.nextNode();
while(node != null)
{
@@ -170,7 +170,7 @@ class QueueConsumerList<E extends QueueE
//correctness reasons, however we have just 'deleted'
//the tail. Inserting an empty dummy node after it will
//let us scavenge the node containing the Consumer.
- insert(new ConsumerNode<E,Q,L>(), false);
+ insert(new ConsumerNode(), false);
}
//advance the next node reference in the 'prevNode' to scavenge
@@ -189,9 +189,9 @@ class QueueConsumerList<E extends QueueE
return false;
}
- private void nodeMarkerCleanup(final ConsumerNode<E,Q,L> node)
+ private void nodeMarkerCleanup(final ConsumerNode node)
{
- ConsumerNode<E,Q,L> markedNode = _subNodeMarker.get();
+ ConsumerNode markedNode = _subNodeMarker.get();
if(node == markedNode)
{
//if the marked node is the one we are removing, then
@@ -200,7 +200,7 @@ class QueueConsumerList<E extends QueueE
//into the list and find the next node to use.
//Because we inserted a dummy if node was the
//tail, markedNode.nextNode() can never be null.
- ConsumerNode<E,Q,L> dummy = new ConsumerNode<E,Q,L>();
+ ConsumerNode dummy = new ConsumerNode();
dummy.setNext(markedNode.nextNode());
//if the CAS fails the marked node has changed, thus
@@ -219,7 +219,7 @@ class QueueConsumerList<E extends QueueE
}
}
- public boolean updateMarkedNode(final ConsumerNode<E,Q,L> expected, final ConsumerNode<E,Q,L> nextNode)
+ public boolean updateMarkedNode(final ConsumerNode expected, final ConsumerNode nextNode)
{
return _subNodeMarker.compareAndSet(expected, nextNode);
}
@@ -231,41 +231,41 @@ class QueueConsumerList<E extends QueueE
*
* @return the previously marked node (or a dummy if it was subsequently deleted)
*/
- public ConsumerNode<E,Q,L> getMarkedNode()
+ public ConsumerNode getMarkedNode()
{
return _subNodeMarker.get();
}
- public static class ConsumerNodeIterator<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E, Q,L>, L extends QueueEntryListBase<E,Q,L>>
+ public static class ConsumerNodeIterator
{
- private ConsumerNode<E,Q,L> _lastNode;
+ private ConsumerNode _lastNode;
- ConsumerNodeIterator(ConsumerNode<E,Q,L> startNode)
+ ConsumerNodeIterator(ConsumerNode startNode)
{
_lastNode = startNode;
}
- public ConsumerNode<E,Q,L> getNode()
+ public ConsumerNode getNode()
{
return _lastNode;
}
public boolean advance()
{
- ConsumerNode<E,Q,L> nextNode = _lastNode.findNext();
+ ConsumerNode nextNode = _lastNode.findNext();
_lastNode = nextNode;
return _lastNode != null;
}
}
- public ConsumerNodeIterator<E,Q,L> iterator()
+ public ConsumerNodeIterator iterator()
{
- return new ConsumerNodeIterator<E,Q,L>(_head);
+ return new ConsumerNodeIterator(_head);
}
- public ConsumerNode<E,Q,L> getHead()
+ public ConsumerNode getHead()
{
return _head;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Wed Feb 26 23:27:39 2014
@@ -23,32 +23,32 @@ package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-final class QueueContext<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+final class QueueContext
{
- private volatile E _lastSeenEntry;
- private volatile E _releasedEntry;
+ private volatile QueueEntry _lastSeenEntry;
+ private volatile QueueEntry _releasedEntry;
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
_lastSeenUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntryImpl.class, "_lastSeenEntry");
- static final AtomicReferenceFieldUpdater<QueueContext, QueueEntryImpl>
+ (QueueContext.class, QueueEntry.class, "_lastSeenEntry");
+ static final AtomicReferenceFieldUpdater<QueueContext, QueueEntry>
_releasedUpdater =
AtomicReferenceFieldUpdater.newUpdater
- (QueueContext.class, QueueEntryImpl.class, "_releasedEntry");
+ (QueueContext.class, QueueEntry.class, "_releasedEntry");
- public QueueContext(E head)
+ public QueueContext(QueueEntry head)
{
_lastSeenEntry = head;
}
- public E getLastSeenEntry()
+ public QueueEntry getLastSeenEntry()
{
return _lastSeenEntry;
}
- E getReleasedEntry()
+ QueueEntry getReleasedEntry()
{
return _releasedEntry;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Wed Feb 26 23:27:39 2014
@@ -23,18 +23,18 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.MessageInstance;
-public interface QueueEntry<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, C extends Consumer> extends MessageInstance<E,C>, Comparable<E>
+public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
{
- Q getQueue();
+ AMQQueue getQueue();
long getSize();
boolean isQueueDeleted();
- E getNextNode();
+ QueueEntry getNextNode();
- E getNextValidEntry();
+ QueueEntry getNextValidEntry();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed Feb 26 23:27:39 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -43,11 +43,11 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-public abstract class QueueEntryImpl<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> implements QueueEntry<E,Q,QueueConsumer<?,E,Q,L>>
+public abstract class QueueEntryImpl implements QueueEntry
{
private static final Logger _log = Logger.getLogger(QueueEntryImpl.class);
- private final L _queueEntryList;
+ private final QueueEntryList _queueEntryList;
private final MessageReference _message;
@@ -62,7 +62,7 @@ public abstract class QueueEntryImpl<E e
(QueueEntryImpl.class, EntryState.class, "_state");
- private volatile Set<StateChangeListener<? super E, State>> _stateChangeListeners;
+ private volatile Set<StateChangeListener<? super QueueEntry, State>> _stateChangeListeners;
private static final
AtomicReferenceFieldUpdater<QueueEntryImpl, Set>
@@ -89,14 +89,14 @@ public abstract class QueueEntryImpl<E e
private boolean _deliveredToConsumer;
- public QueueEntryImpl(L queueEntryList)
+ public QueueEntryImpl(QueueEntryList queueEntryList)
{
this(queueEntryList,null,Long.MIN_VALUE);
_state = DELETED_STATE;
}
- public QueueEntryImpl(L queueEntryList, ServerMessage message, final long entryId)
+ public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
@@ -106,7 +106,7 @@ public abstract class QueueEntryImpl<E e
populateInstanceProperties();
}
- public QueueEntryImpl(L queueEntryList, ServerMessage message)
+ public QueueEntryImpl(QueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
_message = message == null ? null : message.newReference();
@@ -137,7 +137,7 @@ public abstract class QueueEntryImpl<E e
return _entryId;
}
- public Q getQueue()
+ public AMQQueue getQueue()
{
return _queueEntryList.getQueue();
}
@@ -201,9 +201,9 @@ public abstract class QueueEntryImpl<E e
return acquired;
}
- public boolean acquire(QueueConsumer<?,E,Q,L> sub)
+ public boolean acquire(Consumer sub)
{
- final boolean acquired = acquire(sub.getOwningState());
+ final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
if(acquired)
{
_deliveredToConsumer = true;
@@ -217,7 +217,7 @@ public abstract class QueueEntryImpl<E e
return (_state instanceof ConsumerAcquiredState);
}
- public boolean isAcquiredBy(QueueConsumer consumer)
+ public boolean isAcquiredBy(Consumer consumer)
{
EntryState state = _state;
return state instanceof ConsumerAcquiredState
@@ -233,12 +233,12 @@ public abstract class QueueEntryImpl<E e
if(state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount((E) this);
+ getQueue().decrementUnackedMsgCount(this);
}
if(!getQueue().isDeleted())
{
- getQueue().requeue((E)this);
+ getQueue().requeue(this);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
@@ -287,7 +287,7 @@ public abstract class QueueEntryImpl<E e
_rejectedBy = new HashSet<Long>();
}
- _rejectedBy.add(consumer.getId());
+ _rejectedBy.add(consumer.getConsumerNumber());
}
else
{
@@ -295,12 +295,12 @@ public abstract class QueueEntryImpl<E e
}
}
- public boolean isRejectedBy(QueueConsumer consumer)
+ public boolean isRejectedBy(Consumer consumer)
{
if (_rejectedBy != null) // We have consumers that rejected this message
{
- return _rejectedBy.contains(consumer.getId());
+ return _rejectedBy.contains(consumer.getConsumerNumber());
}
else // This message hasn't been rejected yet.
{
@@ -316,10 +316,10 @@ public abstract class QueueEntryImpl<E e
{
if (state instanceof ConsumerAcquiredState)
{
- getQueue().decrementUnackedMsgCount((E) this);
+ getQueue().decrementUnackedMsgCount(this);
}
- getQueue().dequeue((E)this);
+ getQueue().dequeue(this);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -331,9 +331,9 @@ public abstract class QueueEntryImpl<E e
private void notifyStateChange(final State oldState, final State newState)
{
- for(StateChangeListener<? super E, State> l : _stateChangeListeners)
+ for(StateChangeListener<? super QueueEntry, State> l : _stateChangeListeners)
{
- l.stateChanged((E)this, oldState, newState);
+ l.stateChanged(this, oldState, newState);
}
}
@@ -343,7 +343,7 @@ public abstract class QueueEntryImpl<E e
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.entryDeleted((E)this);
+ _queueEntryList.entryDeleted(this);
onDelete();
_message.release();
@@ -362,10 +362,10 @@ public abstract class QueueEntryImpl<E e
dispose();
}
- public int routeToAlternate(final Action<? super MessageInstance<?, ? extends Consumer>> action, ServerTransaction txn)
+ public int routeToAlternate(final Action<? super MessageInstance> action, ServerTransaction txn)
{
final AMQQueue currentQueue = getQueue();
- Exchange alternateExchange = currentQueue.getAlternateExchange();
+ ExchangeImpl alternateExchange = currentQueue.getAlternateExchange();
boolean autocommit = txn == null;
int enqueues;
@@ -412,21 +412,21 @@ public abstract class QueueEntryImpl<E e
return getQueue().isDeleted();
}
- public void addStateChangeListener(StateChangeListener<? super E,State> listener)
+ public void addStateChangeListener(StateChangeListener<? super MessageInstance,State> listener)
{
- Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners == null)
{
- _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super E, State>>());
+ _listenersUpdater.compareAndSet(this, null, new CopyOnWriteArraySet<StateChangeListener<? super QueueEntry, State>>());
listeners = _stateChangeListeners;
}
listeners.add(listener);
}
- public boolean removeStateChangeListener(StateChangeListener<? super E, State> listener)
+ public boolean removeStateChangeListener(StateChangeListener<? super MessageInstance, State> listener)
{
- Set<StateChangeListener<? super E, State>> listeners = _stateChangeListeners;
+ Set<StateChangeListener<? super QueueEntry, State>> listeners = _stateChangeListeners;
if(listeners != null)
{
return listeners.remove(listener);
@@ -436,9 +436,13 @@ public abstract class QueueEntryImpl<E e
}
- public int compareTo(final E o)
+ public int compareTo(final QueueEntry o)
{
- E other = o;
+ if(o == null)
+ {
+ return 1;
+ }
+ QueueEntryImpl other = (QueueEntryImpl)o;
return getEntryId() > other.getEntryId() ? 1 : getEntryId() < other.getEntryId() ? -1 : 0;
}
@@ -446,7 +450,7 @@ public abstract class QueueEntryImpl<E e
{
}
- public L getQueueEntryList()
+ public QueueEntryList getQueueEntryList()
{
return _queueEntryList;
}
@@ -464,7 +468,7 @@ public abstract class QueueEntryImpl<E e
@Override
public int getMaximumDeliveryCount()
{
- return getQueue().getMaximumDeliveryCount();
+ return getQueue().getMaximumDeliveryAttempts();
}
public void incrementDeliveryCount()
@@ -494,10 +498,10 @@ public abstract class QueueEntryImpl<E e
@Override
public boolean resend()
{
- QueueConsumer<?,E,Q,L> sub = getDeliveredConsumer();
+ QueueConsumer sub = getDeliveredConsumer();
if(sub != null)
{
- return sub.resend((E)this);
+ return sub.resend(this);
}
return false;
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java Wed Feb 26 23:27:39 2014
@@ -20,13 +20,11 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
-
-public interface QueueEntryIterator<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
+public interface QueueEntryIterator
{
boolean atTail();
- E getNode();
+ QueueEntry getNode();
boolean advance();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -23,19 +23,19 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.ServerMessage;
-public interface QueueEntryList<E extends QueueEntry<E,Q,C>, Q extends AMQQueue<E,Q,C>, L extends QueueEntryList<E,Q,L,C>, C extends Consumer>
+public interface QueueEntryList
{
- Q getQueue();
+ AMQQueue getQueue();
- E add(ServerMessage message);
+ QueueEntry add(ServerMessage message);
- E next(E node);
+ QueueEntry next(QueueEntry node);
- QueueEntryIterator<E,Q,L,C> iterator();
+ QueueEntryIterator iterator();
- E getHead();
+ QueueEntry getHead();
- void entryDeleted(E queueEntry);
+ void entryDeleted(QueueEntry queueEntry);
int getPriorities();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java Wed Feb 26 23:27:39 2014
@@ -20,6 +20,6 @@
*/
package org.apache.qpid.server.queue;
-public interface QueueEntryListBase<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>> extends QueueEntryList<E,Q,L,QueueConsumer<?,E,Q,L>>
+public interface QueueEntryListBase extends QueueEntryList
{
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java Wed Feb 26 23:27:39 2014
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.queue;
-interface QueueEntryListFactory<E extends QueueEntryImpl<E,Q,L>, Q extends AbstractQueue<E,Q,L>, L extends QueueEntryListBase<E,Q,L>>
+interface QueueEntryListFactory
{
- public L createQueueEntryList(Q queue);
+ public QueueEntryList createQueueEntryList(AMQQueue<?> queue);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java Wed Feb 26 23:27:39 2014
@@ -16,9 +16,7 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.consumer.Consumer;
-
-public interface QueueEntryVisitor<E extends QueueEntry>
+public interface QueueEntryVisitor
{
- boolean visit(E entry);
+ boolean visit(QueueEntry entry);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Wed Feb 26 23:27:39 2014
@@ -28,7 +28,7 @@ import org.apache.qpid.server.virtualhos
import java.util.Map;
-public class SortedQueue extends OutOfOrderQueue<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
+public class SortedQueue extends OutOfOrderQueue
{
//Lock object to synchronize enqueue. Used instead of the object
//monitor to prevent lock order issues with consumer sendLocks
@@ -38,7 +38,7 @@ public class SortedQueue extends OutOfOr
protected SortedQueue(VirtualHost virtualHost,
Map<String, Object> attributes,
- QueueEntryListFactory<SortedQueueEntry, SortedQueue, SortedQueueEntryList> factory)
+ QueueEntryListFactory factory)
{
super(virtualHost, attributes, factory);
_sortedPropertyName = MapValueConverter.getStringAttribute(Queue.SORT_KEY,attributes);
@@ -62,7 +62,7 @@ public class SortedQueue extends OutOfOr
@Override
public void enqueue(final ServerMessage message,
- final Action<? super MessageInstance<?, QueueConsumer<?, SortedQueueEntry, SortedQueue, SortedQueueEntryList>>> action)
+ final Action<? super MessageInstance> action)
{
synchronized (_sortedQueueLock)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java Wed Feb 26 23:27:39 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
/**
* An implementation of QueueEntryImpl to be used in SortedQueueEntryList.
*/
-public class SortedQueueEntry extends QueueEntryImpl<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
+public class SortedQueueEntry extends QueueEntryImpl
{
public static enum Colour
{
@@ -52,8 +52,9 @@ public class SortedQueueEntry extends Qu
}
@Override
- public int compareTo(final SortedQueueEntry o)
+ public int compareTo(final QueueEntry other)
{
+ SortedQueueEntry o = (SortedQueueEntry)other;
final String otherKey = o._key;
final int compare = _key == null ? (otherKey == null ? 0 : -1) : otherKey == null ? 1 : _key.compareTo(otherKey);
return compare == 0 ? super.compareTo(o) : compare;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -30,7 +30,7 @@ import org.apache.qpid.server.queue.Sort
* ISBN-13: 978-0262033848
* see http://en.wikipedia.org/wiki/Red-black_tree
*/
-public class SortedQueueEntryList implements QueueEntryListBase<SortedQueueEntry, SortedQueue, SortedQueueEntryList>
+public class SortedQueueEntryList implements QueueEntryList
{
private final SortedQueueEntry _head;
private SortedQueueEntry _root;
@@ -279,8 +279,9 @@ public class SortedQueueEntryList implem
return (node == null ? Colour.BLACK : node.getColour()) == colour;
}
- public SortedQueueEntry next(final SortedQueueEntry node)
+ public SortedQueueEntry next(final QueueEntry entry)
{
+ SortedQueueEntry node = (SortedQueueEntry)entry;
synchronized(_lock)
{
if(node.isDeleted() && _head != node)
@@ -308,7 +309,7 @@ public class SortedQueueEntryList implem
}
}
- public QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>> iterator()
+ public QueueEntryIterator iterator()
{
return new QueueEntryIteratorImpl(_head);
}
@@ -323,8 +324,9 @@ public class SortedQueueEntryList implem
return _root;
}
- public void entryDeleted(final SortedQueueEntry entry)
+ public void entryDeleted(final QueueEntry e)
{
+ SortedQueueEntry entry = (SortedQueueEntry)e;
synchronized(_lock)
{
// If the node to be removed has two children, we swap the position
@@ -618,7 +620,7 @@ public class SortedQueueEntryList implem
return x == null ? null : x.getColour();
}
- public class QueueEntryIteratorImpl implements QueueEntryIterator<SortedQueueEntry,SortedQueue,SortedQueueEntryList,QueueConsumer<?,SortedQueueEntry,SortedQueue,SortedQueueEntryList>>
+ public class QueueEntryIteratorImpl implements QueueEntryIterator
{
private SortedQueueEntry _lastNode;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java Wed Feb 26 23:27:39 2014
@@ -19,7 +19,7 @@
*/
package org.apache.qpid.server.queue;
-public class SortedQueueEntryListFactory implements QueueEntryListFactory<SortedQueueEntry,SortedQueue,SortedQueueEntryList>
+public class SortedQueueEntryListFactory implements QueueEntryListFactory
{
private final String _propertyName;
@@ -30,8 +30,8 @@ public class SortedQueueEntryListFactory
}
@Override
- public SortedQueueEntryList createQueueEntryList(final SortedQueue queue)
+ public SortedQueueEntryList createQueueEntryList(final AMQQueue<?> queue)
{
- return new SortedQueueEntryList(queue, _propertyName);
+ return new SortedQueueEntryList((SortedQueue) queue, _propertyName);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java Wed Feb 26 23:27:39 2014
@@ -25,7 +25,7 @@ import org.apache.qpid.server.virtualhos
import java.util.Map;
-public class StandardQueue extends AbstractQueue<StandardQueueEntry,StandardQueue,StandardQueueEntryList>
+public class StandardQueue extends AbstractQueue
{
public StandardQueue(final VirtualHost virtualHost,
final Map<String, Object> arguments)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java Wed Feb 26 23:27:39 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-public class StandardQueueEntry extends OrderedQueueEntry<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+public class StandardQueueEntry extends OrderedQueueEntry
{
protected StandardQueueEntry(final StandardQueueEntryList queueEntryList)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Wed Feb 26 23:27:39 2014
@@ -22,15 +22,15 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
-public class StandardQueueEntryList extends OrderedQueueEntryList<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+public class StandardQueueEntryList extends OrderedQueueEntryList
{
- private static final HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList> HEAD_CREATOR = new HeadCreator<StandardQueueEntry, StandardQueue, StandardQueueEntryList>()
+ private static final HeadCreator HEAD_CREATOR = new HeadCreator()
{
@Override
- public StandardQueueEntry createHead(final StandardQueueEntryList list)
+ public StandardQueueEntry createHead(final QueueEntryList list)
{
- return new StandardQueueEntry(list);
+ return new StandardQueueEntry((StandardQueueEntryList) list);
}
};
@@ -45,12 +45,12 @@ public class StandardQueueEntryList exte
return new StandardQueueEntry(this, message);
}
- static class Factory implements QueueEntryListFactory<StandardQueueEntry, StandardQueue, StandardQueueEntryList>
+ static class Factory implements QueueEntryListFactory
{
- public StandardQueueEntryList createQueueEntryList(StandardQueue queue)
+ public StandardQueueEntryList createQueueEntryList(AMQQueue<?> queue)
{
- return new StandardQueueEntryList(queue);
+ return new StandardQueueEntryList((StandardQueue) queue);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org