You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/03/07 20:18:28 UTC
svn commit: r383969 - in
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
DurableTopicSubscription.java PrefetchSubscription.java
QueueBrowserSubscription.java QueueSubscription.java
Author: chirino
Date: Tue Mar 7 11:18:28 2006
New Revision: 383969
URL: http://svn.apache.org/viewcvs?rev=383969&view=rev
Log:
renamed matched to pending.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Mar 7 11:18:28 2006
@@ -97,12 +97,12 @@
iter.remove();
}
- for (Iterator iter = matched.iterator(); iter.hasNext();) {
+ for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
// node.decrementTargetCount();
iter.remove();
}
- delivered=0;
+ prefetchExtension=0;
}
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
@@ -156,8 +156,8 @@
" consumer="+info.getConsumerId()+
", destinations="+destinations.size()+
", dispatched="+dispatched.size()+
- ", delivered="+this.delivered+
- ", matched="+this.matched.size();
+ ", delivered="+this.prefetchExtension+
+ ", pending="+this.pending.size();
}
public String getClientId() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Tue Mar 7 11:18:28 2006
@@ -42,17 +42,17 @@
abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
- final protected LinkedList matched=new LinkedList();
+ final protected LinkedList pending=new LinkedList();
final protected LinkedList dispatched=new LinkedList();
- protected int delivered=0;
+ protected int prefetchExtension=0;
int preLoadLimit=1024*100;
int preLoadSize=0;
boolean dispatching=false;
long enqueueCounter;
long dispatchCounter;
- long aknowledgedCounter;
+ long dequeueCounter;
public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
throws InvalidSelectorException{
@@ -64,15 +64,15 @@
if(!isFull()&&!isSlaveBroker()){
dispatch(node);
}else{
- synchronized(matched){
- matched.addLast(node);
+ synchronized(pending){
+ pending.addLast(node);
}
}
}
public void processMessageDispatchNotification(MessageDispatchNotification mdn){
- synchronized(matched){
- for(Iterator i=matched.iterator();i.hasNext();){
+ synchronized(pending){
+ for(Iterator i=pending.iterator();i.hasNext();){
MessageReference node=(MessageReference) i.next();
if(node.getMessageId().equals(mdn.getMessageId())){
i.remove();
@@ -106,16 +106,16 @@
if(inAckRange){
// Don't remove the nodes until we are committed.
if(!context.isInTransaction()){
- aknowledgedCounter++;
+ dequeueCounter++;
iter.remove();
}else{
// setup a Synchronization to remove nodes from the dispatched list.
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
synchronized(PrefetchSubscription.this){
- aknowledgedCounter++;
+ dequeueCounter++;
dispatched.remove(node);
- delivered--;
+ prefetchExtension--;
}
}
});
@@ -124,9 +124,9 @@
acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){
if(context.isInTransaction())
- delivered=Math.max(delivered,index+1);
+ prefetchExtension=Math.max(prefetchExtension,index+1);
else
- delivered=Math.max(0,delivered-(index+1));
+ prefetchExtension=Math.max(0,prefetchExtension-(index+1));
if(wasFull&&!isFull()){
dispatchMatched();
}
@@ -144,7 +144,7 @@
for(Iterator iter=dispatched.iterator();iter.hasNext();index++){
final MessageReference node=(MessageReference) iter.next();
if(ack.getLastMessageId().equals(node.getMessageId())){
- delivered=Math.max(delivered,index+1);
+ prefetchExtension=Math.max(prefetchExtension,index+1);
if(wasFull&&!isFull()){
dispatchMatched();
}
@@ -184,11 +184,11 @@
node.decrementReferenceCount();
}
iter.remove();
- aknowledgedCounter++;
+ dequeueCounter++;
index++;
acknowledge(context,ack,node);
if(ack.getLastMessageId().equals(messageId)){
- delivered=Math.max(0,delivered-(index+1));
+ prefetchExtension=Math.max(0,prefetchExtension-(index+1));
if(wasFull&&!isFull()){
dispatchMatched();
}
@@ -202,11 +202,11 @@
}
protected boolean isFull(){
- return dispatched.size()-delivered>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
+ return dispatched.size()-prefetchExtension>=info.getPrefetchSize()||preLoadSize>preLoadLimit;
}
synchronized public int getPendingQueueSize(){
- return matched.size();
+ return pending.size();
}
synchronized public int getDispatchedQueueSize(){
@@ -214,7 +214,7 @@
}
synchronized public long getDequeueCounter(){
- return aknowledgedCounter;
+ return dequeueCounter;
}
synchronized public long getDispatchedCounter() {
@@ -230,7 +230,7 @@
if(!dispatching){
dispatching=true;
try{
- for(Iterator iter=matched.iterator();iter.hasNext()&&!isFull();){
+ for(Iterator iter=pending.iterator();iter.hasNext()&&!isFull();){
MessageReference node=(MessageReference) iter.next();
iter.remove();
dispatch(node);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Tue Mar 7 11:18:28 2006
@@ -45,8 +45,8 @@
" consumer="+info.getConsumerId()+
", destinations="+destinations.size()+
", dispatched="+dispatched.size()+
- ", delivered="+this.delivered+
- ", matched="+this.matched.size();
+ ", delivered="+this.prefetchExtension+
+ ", pending="+this.pending.size();
}
public void browseDone() throws Exception {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=383969&r1=383968&r2=383969&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Tue Mar 7 11:18:28 2006
@@ -126,8 +126,8 @@
" consumer="+info.getConsumerId()+
", destinations="+destinations.size()+
", dispatched="+dispatched.size()+
- ", delivered="+this.delivered+
- ", matched="+this.matched.size();
+ ", delivered="+this.prefetchExtension+
+ ", pending="+this.pending.size();
}
public int getLockPriority() {