You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/12/31 00:49:04 UTC

svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java

Author: chirino
Date: Sat Dec 30 15:49:03 2006
New Revision: 491346

URL: http://svn.apache.org/viewvc?view=rev&rev=491346
Log:
Fix for CursorDurableTest.
The TopicStorePrefetch was iterating items that were in the subscription but not added to the pending list.

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sat Dec 30 15:49:03 2006
@@ -406,7 +406,9 @@
                             pending.reset();
                             while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
                                 MessageReference node=pending.next();
-                               
+                                if ( node == null )
+                                	break;
+                                
                                 if(canDispatch(node)){
                                     pending.remove();
                                     // Message may have been sitting in the pending list a while

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30 15:49:03 2006
@@ -20,7 +20,7 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
-import javax.jms.JMSException;
+
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Topic;
@@ -48,6 +48,10 @@
     private String subscriberName;
     private Destination regionDestination;
 
+    boolean empty=true;
+	private MessageId firstMessageId;
+	private MessageId lastMessageId;
+
     /**
      * @param topic
      * @param clientId
@@ -73,7 +77,7 @@
      * @return true if there are no pending messages
      */
     public boolean isEmpty(){
-        return batchList.isEmpty();
+        return empty;
     }
     
     public synchronized int size(){
@@ -86,27 +90,55 @@
     }
     
     public synchronized void addMessageLast(MessageReference node) throws Exception{
-        if(node!=null){
+		if(node!=null){
+			if( empty ) {
+				firstMessageId = node.getMessageId();
+				empty=false;
+			}
+	        lastMessageId = node.getMessageId();
             node.decrementReferenceCount();
         }
     }
 
-    public synchronized boolean hasNext(){
-        if(isEmpty()){
-            try{
-                fillBatch();
-            }catch(Exception e){
-                log.error("Failed to fill batch",e);
-                throw new RuntimeException(e);
-            }
-        }
+    public synchronized boolean hasNext() {
         return !isEmpty();
     }
 
     public synchronized MessageReference next(){
-        Message result = (Message)batchList.removeFirst();
-        result.setRegionDestination(regionDestination);
-        return result;
+    	    	
+        if( empty ) {
+        	return null;
+        } else {
+
+        	// We may need to fill in the batch...
+            if(batchList.isEmpty()){
+                try{
+                    fillBatch();
+                }catch(Exception e){
+                    log.error("Failed to fill batch",e);
+                    throw new RuntimeException(e);
+                }
+                if( batchList.isEmpty()) {
+                	return null;
+                }
+            }
+
+            Message result = (Message)batchList.removeFirst();
+        	
+        	if( firstMessageId != null ) {
+            	// Skip messages until we get to the first message.
+        		if( !result.getMessageId().equals(firstMessageId) ) 
+        			return null;
+        		firstMessageId = null;
+        	}
+        	if( lastMessageId != null ) {
+        		if( result.getMessageId().equals(lastMessageId) ) {
+        			empty=true;
+        		}
+        	}        	
+            result.setRegionDestination(regionDestination);
+            return result;
+        }
     }
 
     public void reset(){
@@ -130,13 +162,7 @@
 
     // implementation
     protected void fillBatch() throws Exception{
-        store.recoverNextMessages(clientId,subscriberName,
-                maxBatchSize,this);
-        // this will add more messages to the batch list
-        if(!batchList.isEmpty()){
-            Message message=(Message)batchList.getLast();
-          
-        }
+        store.recoverNextMessages(clientId,subscriberName,maxBatchSize,this);
     }
     
     public void gc() {



Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java

Posted by Rob Davies <ra...@gmail.com>.
On 31 Dec 2006, at 08:32, Hiram Chirino wrote:

> On 12/31/06, Rob Davies <ra...@gmail.com> wrote:
>>
>> On 31 Dec 2006, at 07:59, Hiram Chirino wrote:
>>
>> > On 12/31/06, Rob Davies <ra...@gmail.com> wrote:
>> >> Hey Hiram,
>> >>
>> >> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
>> >> oorg.apache.activemq.broker.BrokerTest, etc   for me.
>> >>
>> >
>> > yeah I think I have fix for that. sorry I broke it.  I'm running  
>> the
>> > test suite again now.  Basically I think I need to default boolean
>> > empty=false;  So that an initial recovery of subscription is done.
>> >
>> >> also - I'm not sure I like TopicStorePrefetch possibly  
>> returning null
>> >> when a hasNext() has returned true
>> >>
>> >
>> > Yeah me neither :)  I did not fully understand why it was returning
>> > null when I expected it to return a value.  I was thinking it  
>> could be
>> > a timing issue with the MessageStore.
>> >
>> >> What was the problem in CursorDurableTest ? I hadn't seen that one
>> >>
>> >
>> > CursorDurableTest had a test that was failing due to out of
>> > order/duplicates showing up.  This was cause sometimes some  
>> messages
>> > were direct dispatched and at other times they are dispatched  
>> from the
>> > pending list.  But since the pending list's .next() was  
>> returning the
>> > items that were directly dispatched and not even added to the  
>> pending
>> > list.  This is when the dups and out of order issues would show up.
>> >
>> > The problem is that TopicStorePrefetch.next() was returning  
>> everything
>> > added to the durable subscription since it's backed by the
>> > MessageStore.  And that's not what we want.  We only want it to  
>> return
>> > things that are explicitly added to it since it's the pending list.
>>
>> I wonder if the real problem is then in PrefetchSubscription.add() -
>> because only if pending is empty (nothing in the store) should it
>> dispatch directly
>
> Could be an interaction.  I think TopicStorePrefetch still needs a
> little more work.  I think we need to recover the TopicStorePrefetch
> when the the durable subscription is created so that way we know if it
> is initially empty or not.

it does (or did) - when the subscriber is activated - it sees how  
many messages are outstanding in the store
>
>> >
>> >
>> >> cheers,
>> >>
>> >> Rob
>> >>
>> >> On 30 Dec 2006, at 23:49, chirino@apache.org wrote:
>> >>
>> >> > Author: chirino
>> >> > Date: Sat Dec 30 15:49:03 2006
>> >> > New Revision: 491346
>> >> >
>> >> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
>> >> > Log:
>> >> > Fix for CursorDurableTest.
>> >> > The TopicStorePrefetch was iterating items that were in the
>> >> > subscription but not added to the pending list.
>> >> >
>> >> > Modified:
>> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/PrefetchSubscription.java
>> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/cursors/TopicStorePrefetch.java
>> >> >
>> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/ 
>> java/org/
>> >> > apache/activemq/broker/region/PrefetchSubscription.java
>> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> >> activemq-
>> >> > core/src/main/java/org/apache/activemq/broker/region/
>> >> > PrefetchSubscription.java? 
>> view=diff&rev=491346&r1=491345&r2=491346
>> >> >
>> >>  
>> =====================================================================
>> >> =
>> >> > ========
>> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/PrefetchSubscription.java (original)
>> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
>> >> > 15:49:03 2006
>> >> > @@ -406,7 +406,9 @@
>> >> >                              pending.reset();
>> >> >                              while(pending.hasNext()&&!isFull()
>> >> > &&count<numberToDispatch){
>> >> >                                  MessageReference
>> >> node=pending.next();
>> >> > -
>> >> > +                                if ( node == null )
>> >> > +                                     break;
>> >> > +
>> >> >                                  if(canDispatch(node)){
>> >> >                                      pending.remove();
>> >> >                                      // Message may have been
>> >> > sitting in the pending list a while
>> >> >
>> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/ 
>> java/org/
>> >> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
>> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
>> >> activemq-
>> >> > core/src/main/java/org/apache/activemq/broker/region/cursors/
>> >> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
>> >> >
>> >>  
>> =====================================================================
>> >> =
>> >> > ========
>> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/cursors/TopicStorePrefetch.java  
>> (original)
>> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
>> >> apache/
>> >> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat  
>> Dec 30
>> >> > 15:49:03 2006
>> >> > @@ -20,7 +20,7 @@
>> >> >
>> >> >  import java.io.IOException;
>> >> >  import java.util.LinkedList;
>> >> > -import javax.jms.JMSException;
>> >> > +
>> >> >  import org.apache.activemq.broker.region.Destination;
>> >> >  import org.apache.activemq.broker.region.MessageReference;
>> >> >  import org.apache.activemq.broker.region.Topic;
>> >> > @@ -48,6 +48,10 @@
>> >> >      private String subscriberName;
>> >> >      private Destination regionDestination;
>> >> >
>> >> > +    boolean empty=true;
>> >> > +     private MessageId firstMessageId;
>> >> > +     private MessageId lastMessageId;
>> >> > +
>> >> >      /**
>> >> >       * @param topic
>> >> >       * @param clientId
>> >> > @@ -73,7 +77,7 @@
>> >> >       * @return true if there are no pending messages
>> >> >       */
>> >> >      public boolean isEmpty(){
>> >> > -        return batchList.isEmpty();
>> >> > +        return empty;
>> >> >      }
>> >> >
>> >> >      public synchronized int size(){
>> >> > @@ -86,27 +90,55 @@
>> >> >      }
>> >> >
>> >> >      public synchronized void addMessageLast(MessageReference  
>> node)
>> >> > throws Exception{
>> >> > -        if(node!=null){
>> >> > +             if(node!=null){
>> >> > +                     if( empty ) {
>> >> > +                             firstMessageId =  
>> node.getMessageId();
>> >> > +                             empty=false;
>> >> > +                     }
>> >> > +             lastMessageId = node.getMessageId();
>> >> >              node.decrementReferenceCount();
>> >> >          }
>> >> >      }
>> >> >
>> >> > -    public synchronized boolean hasNext(){
>> >> > -        if(isEmpty()){
>> >> > -            try{
>> >> > -                fillBatch();
>> >> > -            }catch(Exception e){
>> >> > -                log.error("Failed to fill batch",e);
>> >> > -                throw new RuntimeException(e);
>> >> > -            }
>> >> > -        }
>> >> > +    public synchronized boolean hasNext() {
>> >> >          return !isEmpty();
>> >> >      }
>> >> >
>> >> >      public synchronized MessageReference next(){
>> >> > -        Message result = (Message)batchList.removeFirst();
>> >> > -        result.setRegionDestination(regionDestination);
>> >> > -        return result;
>> >> > +
>> >> > +        if( empty ) {
>> >> > +             return null;
>> >> > +        } else {
>> >> > +
>> >> > +             // We may need to fill in the batch...
>> >> > +            if(batchList.isEmpty()){
>> >> > +                try{
>> >> > +                    fillBatch();
>> >> > +                }catch(Exception e){
>> >> > +                    log.error("Failed to fill batch",e);
>> >> > +                    throw new RuntimeException(e);
>> >> > +                }
>> >> > +                if( batchList.isEmpty()) {
>> >> > +                     return null;
>> >> > +                }
>> >> > +            }
>> >> > +
>> >> > +            Message result = (Message)batchList.removeFirst();
>> >> > +
>> >> > +             if( firstMessageId != null ) {
>> >> > +             // Skip messages until we get to the first  
>> message.
>> >> > +                     if( !result.getMessageId().equals
>> >> (firstMessageId) )
>> >> > +                             return null;
>> >> > +                     firstMessageId = null;
>> >> > +             }
>> >> > +             if( lastMessageId != null ) {
>> >> > +                     if( result.getMessageId().equals
>> >> (lastMessageId) ) {
>> >> > +                             empty=true;
>> >> > +                     }
>> >> > +             }
>> >> > +            result.setRegionDestination(regionDestination);
>> >> > +            return result;
>> >> > +        }
>> >> >      }
>> >> >
>> >> >      public void reset(){
>> >> > @@ -130,13 +162,7 @@
>> >> >
>> >> >      // implementation
>> >> >      protected void fillBatch() throws Exception{
>> >> > -        store.recoverNextMessages(clientId,subscriberName,
>> >> > -                maxBatchSize,this);
>> >> > -        // this will add more messages to the batch list
>> >> > -        if(!batchList.isEmpty()){
>> >> > -            Message message=(Message)batchList.getLast();
>> >> > -
>> >> > -        }
>> >> > +        store.recoverNextMessages
>> >> > (clientId,subscriberName,maxBatchSize,this);
>> >> >      }
>> >> >
>> >> >      public void gc() {
>> >> >
>> >> >
>> >>
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > Hiram
>> >
>> > Blog: http://hiramchirino.com
>>
>>
>
>
> -- 
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com


Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java

Posted by Hiram Chirino <hi...@hiramchirino.com>.
On 12/31/06, Rob Davies <ra...@gmail.com> wrote:
>
> On 31 Dec 2006, at 07:59, Hiram Chirino wrote:
>
> > On 12/31/06, Rob Davies <ra...@gmail.com> wrote:
> >> Hey Hiram,
> >>
> >> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
> >> oorg.apache.activemq.broker.BrokerTest, etc   for me.
> >>
> >
> > yeah I think I have fix for that. sorry I broke it.  I'm running the
> > test suite again now.  Basically I think I need to default boolean
> > empty=false;  So that an initial recovery of subscription is done.
> >
> >> also - I'm not sure I like TopicStorePrefetch possibly returning null
> >> when a hasNext() has returned true
> >>
> >
> > Yeah me neither :)  I did not fully understand why it was returning
> > null when I expected it to return a value.  I was thinking it could be
> > a timing issue with the MessageStore.
> >
> >> What was the problem in CursorDurableTest ? I hadn't seen that one
> >>
> >
> > CursorDurableTest had a test that was failing due to out of
> > order/duplicates showing up.  This was cause sometimes some messages
> > were direct dispatched and at other times they are dispatched from the
> > pending list.  But since the pending list's .next() was returning the
> > items that were directly dispatched and not even added to the pending
> > list.  This is when the dups and out of order issues would show up.
> >
> > The problem is that TopicStorePrefetch.next() was returning everything
> > added to the durable subscription since it's backed by the
> > MessageStore.  And that's not what we want.  We only want it to return
> > things that are explicitly added to it since it's the pending list.
>
> I wonder if the real problem is then in PrefetchSubscription.add() -
> because only if pending is empty (nothing in the store) should it
> dispatch directly

Could be an interaction.  I think TopicStorePrefetch still needs a
little more work.  I think we need to recover the TopicStorePrefetch
when the the durable subscription is created so that way we know if it
is initially empty or not.

> >
> >
> >> cheers,
> >>
> >> Rob
> >>
> >> On 30 Dec 2006, at 23:49, chirino@apache.org wrote:
> >>
> >> > Author: chirino
> >> > Date: Sat Dec 30 15:49:03 2006
> >> > New Revision: 491346
> >> >
> >> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
> >> > Log:
> >> > Fix for CursorDurableTest.
> >> > The TopicStorePrefetch was iterating items that were in the
> >> > subscription but not added to the pending list.
> >> >
> >> > Modified:
> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/PrefetchSubscription.java
> >> >     incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/cursors/TopicStorePrefetch.java
> >> >
> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> > apache/activemq/broker/region/PrefetchSubscription.java
> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
> >> activemq-
> >> > core/src/main/java/org/apache/activemq/broker/region/
> >> > PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
> >> >
> >> =====================================================================
> >> =
> >> > ========
> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/PrefetchSubscription.java (original)
> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
> >> > 15:49:03 2006
> >> > @@ -406,7 +406,9 @@
> >> >                              pending.reset();
> >> >                              while(pending.hasNext()&&!isFull()
> >> > &&count<numberToDispatch){
> >> >                                  MessageReference
> >> node=pending.next();
> >> > -
> >> > +                                if ( node == null )
> >> > +                                     break;
> >> > +
> >> >                                  if(canDispatch(node)){
> >> >                                      pending.remove();
> >> >                                      // Message may have been
> >> > sitting in the pending list a while
> >> >
> >> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> >> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/
> >> activemq-
> >> > core/src/main/java/org/apache/activemq/broker/region/cursors/
> >> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
> >> >
> >> =====================================================================
> >> =
> >> > ========
> >> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/cursors/TopicStorePrefetch.java (original)
> >> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/
> >> apache/
> >> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
> >> > 15:49:03 2006
> >> > @@ -20,7 +20,7 @@
> >> >
> >> >  import java.io.IOException;
> >> >  import java.util.LinkedList;
> >> > -import javax.jms.JMSException;
> >> > +
> >> >  import org.apache.activemq.broker.region.Destination;
> >> >  import org.apache.activemq.broker.region.MessageReference;
> >> >  import org.apache.activemq.broker.region.Topic;
> >> > @@ -48,6 +48,10 @@
> >> >      private String subscriberName;
> >> >      private Destination regionDestination;
> >> >
> >> > +    boolean empty=true;
> >> > +     private MessageId firstMessageId;
> >> > +     private MessageId lastMessageId;
> >> > +
> >> >      /**
> >> >       * @param topic
> >> >       * @param clientId
> >> > @@ -73,7 +77,7 @@
> >> >       * @return true if there are no pending messages
> >> >       */
> >> >      public boolean isEmpty(){
> >> > -        return batchList.isEmpty();
> >> > +        return empty;
> >> >      }
> >> >
> >> >      public synchronized int size(){
> >> > @@ -86,27 +90,55 @@
> >> >      }
> >> >
> >> >      public synchronized void addMessageLast(MessageReference node)
> >> > throws Exception{
> >> > -        if(node!=null){
> >> > +             if(node!=null){
> >> > +                     if( empty ) {
> >> > +                             firstMessageId = node.getMessageId();
> >> > +                             empty=false;
> >> > +                     }
> >> > +             lastMessageId = node.getMessageId();
> >> >              node.decrementReferenceCount();
> >> >          }
> >> >      }
> >> >
> >> > -    public synchronized boolean hasNext(){
> >> > -        if(isEmpty()){
> >> > -            try{
> >> > -                fillBatch();
> >> > -            }catch(Exception e){
> >> > -                log.error("Failed to fill batch",e);
> >> > -                throw new RuntimeException(e);
> >> > -            }
> >> > -        }
> >> > +    public synchronized boolean hasNext() {
> >> >          return !isEmpty();
> >> >      }
> >> >
> >> >      public synchronized MessageReference next(){
> >> > -        Message result = (Message)batchList.removeFirst();
> >> > -        result.setRegionDestination(regionDestination);
> >> > -        return result;
> >> > +
> >> > +        if( empty ) {
> >> > +             return null;
> >> > +        } else {
> >> > +
> >> > +             // We may need to fill in the batch...
> >> > +            if(batchList.isEmpty()){
> >> > +                try{
> >> > +                    fillBatch();
> >> > +                }catch(Exception e){
> >> > +                    log.error("Failed to fill batch",e);
> >> > +                    throw new RuntimeException(e);
> >> > +                }
> >> > +                if( batchList.isEmpty()) {
> >> > +                     return null;
> >> > +                }
> >> > +            }
> >> > +
> >> > +            Message result = (Message)batchList.removeFirst();
> >> > +
> >> > +             if( firstMessageId != null ) {
> >> > +             // Skip messages until we get to the first message.
> >> > +                     if( !result.getMessageId().equals
> >> (firstMessageId) )
> >> > +                             return null;
> >> > +                     firstMessageId = null;
> >> > +             }
> >> > +             if( lastMessageId != null ) {
> >> > +                     if( result.getMessageId().equals
> >> (lastMessageId) ) {
> >> > +                             empty=true;
> >> > +                     }
> >> > +             }
> >> > +            result.setRegionDestination(regionDestination);
> >> > +            return result;
> >> > +        }
> >> >      }
> >> >
> >> >      public void reset(){
> >> > @@ -130,13 +162,7 @@
> >> >
> >> >      // implementation
> >> >      protected void fillBatch() throws Exception{
> >> > -        store.recoverNextMessages(clientId,subscriberName,
> >> > -                maxBatchSize,this);
> >> > -        // this will add more messages to the batch list
> >> > -        if(!batchList.isEmpty()){
> >> > -            Message message=(Message)batchList.getLast();
> >> > -
> >> > -        }
> >> > +        store.recoverNextMessages
> >> > (clientId,subscriberName,maxBatchSize,this);
> >> >      }
> >> >
> >> >      public void gc() {
> >> >
> >> >
> >>
> >>
> >
> >
> > --
> > Regards,
> > Hiram
> >
> > Blog: http://hiramchirino.com
>
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java

Posted by Rob Davies <ra...@gmail.com>.
On 31 Dec 2006, at 07:59, Hiram Chirino wrote:

> On 12/31/06, Rob Davies <ra...@gmail.com> wrote:
>> Hey Hiram,
>>
>> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
>> oorg.apache.activemq.broker.BrokerTest, etc   for me.
>>
>
> yeah I think I have fix for that. sorry I broke it.  I'm running the
> test suite again now.  Basically I think I need to default boolean
> empty=false;  So that an initial recovery of subscription is done.
>
>> also - I'm not sure I like TopicStorePrefetch possibly returning null
>> when a hasNext() has returned true
>>
>
> Yeah me neither :)  I did not fully understand why it was returning
> null when I expected it to return a value.  I was thinking it could be
> a timing issue with the MessageStore.
>
>> What was the problem in CursorDurableTest ? I hadn't seen that one
>>
>
> CursorDurableTest had a test that was failing due to out of
> order/duplicates showing up.  This was cause sometimes some messages
> were direct dispatched and at other times they are dispatched from the
> pending list.  But since the pending list's .next() was returning the
> items that were directly dispatched and not even added to the pending
> list.  This is when the dups and out of order issues would show up.
>
> The problem is that TopicStorePrefetch.next() was returning everything
> added to the durable subscription since it's backed by the
> MessageStore.  And that's not what we want.  We only want it to return
> things that are explicitly added to it since it's the pending list.

I wonder if the real problem is then in PrefetchSubscription.add() -  
because only if pending is empty (nothing in the store) should it  
dispatch directly
>
>
>> cheers,
>>
>> Rob
>>
>> On 30 Dec 2006, at 23:49, chirino@apache.org wrote:
>>
>> > Author: chirino
>> > Date: Sat Dec 30 15:49:03 2006
>> > New Revision: 491346
>> >
>> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
>> > Log:
>> > Fix for CursorDurableTest.
>> > The TopicStorePrefetch was iterating items that were in the
>> > subscription but not added to the pending list.
>> >
>> > Modified:
>> >     incubator/activemq/trunk/activemq-core/src/main/java/org/ 
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java
>> >     incubator/activemq/trunk/activemq-core/src/main/java/org/ 
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java
>> >
>> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
>> > apache/activemq/broker/region/PrefetchSubscription.java
>> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/ 
>> activemq-
>> > core/src/main/java/org/apache/activemq/broker/region/
>> > PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
>> >  
>> ===================================================================== 
>> =
>> > ========
>> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/ 
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java (original)
>> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/ 
>> apache/
>> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
>> > 15:49:03 2006
>> > @@ -406,7 +406,9 @@
>> >                              pending.reset();
>> >                              while(pending.hasNext()&&!isFull()
>> > &&count<numberToDispatch){
>> >                                  MessageReference  
>> node=pending.next();
>> > -
>> > +                                if ( node == null )
>> > +                                     break;
>> > +
>> >                                  if(canDispatch(node)){
>> >                                      pending.remove();
>> >                                      // Message may have been
>> > sitting in the pending list a while
>> >
>> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
>> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
>> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/ 
>> activemq-
>> > core/src/main/java/org/apache/activemq/broker/region/cursors/
>> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
>> >  
>> ===================================================================== 
>> =
>> > ========
>> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/ 
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java (original)
>> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/ 
>> apache/
>> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
>> > 15:49:03 2006
>> > @@ -20,7 +20,7 @@
>> >
>> >  import java.io.IOException;
>> >  import java.util.LinkedList;
>> > -import javax.jms.JMSException;
>> > +
>> >  import org.apache.activemq.broker.region.Destination;
>> >  import org.apache.activemq.broker.region.MessageReference;
>> >  import org.apache.activemq.broker.region.Topic;
>> > @@ -48,6 +48,10 @@
>> >      private String subscriberName;
>> >      private Destination regionDestination;
>> >
>> > +    boolean empty=true;
>> > +     private MessageId firstMessageId;
>> > +     private MessageId lastMessageId;
>> > +
>> >      /**
>> >       * @param topic
>> >       * @param clientId
>> > @@ -73,7 +77,7 @@
>> >       * @return true if there are no pending messages
>> >       */
>> >      public boolean isEmpty(){
>> > -        return batchList.isEmpty();
>> > +        return empty;
>> >      }
>> >
>> >      public synchronized int size(){
>> > @@ -86,27 +90,55 @@
>> >      }
>> >
>> >      public synchronized void addMessageLast(MessageReference node)
>> > throws Exception{
>> > -        if(node!=null){
>> > +             if(node!=null){
>> > +                     if( empty ) {
>> > +                             firstMessageId = node.getMessageId();
>> > +                             empty=false;
>> > +                     }
>> > +             lastMessageId = node.getMessageId();
>> >              node.decrementReferenceCount();
>> >          }
>> >      }
>> >
>> > -    public synchronized boolean hasNext(){
>> > -        if(isEmpty()){
>> > -            try{
>> > -                fillBatch();
>> > -            }catch(Exception e){
>> > -                log.error("Failed to fill batch",e);
>> > -                throw new RuntimeException(e);
>> > -            }
>> > -        }
>> > +    public synchronized boolean hasNext() {
>> >          return !isEmpty();
>> >      }
>> >
>> >      public synchronized MessageReference next(){
>> > -        Message result = (Message)batchList.removeFirst();
>> > -        result.setRegionDestination(regionDestination);
>> > -        return result;
>> > +
>> > +        if( empty ) {
>> > +             return null;
>> > +        } else {
>> > +
>> > +             // We may need to fill in the batch...
>> > +            if(batchList.isEmpty()){
>> > +                try{
>> > +                    fillBatch();
>> > +                }catch(Exception e){
>> > +                    log.error("Failed to fill batch",e);
>> > +                    throw new RuntimeException(e);
>> > +                }
>> > +                if( batchList.isEmpty()) {
>> > +                     return null;
>> > +                }
>> > +            }
>> > +
>> > +            Message result = (Message)batchList.removeFirst();
>> > +
>> > +             if( firstMessageId != null ) {
>> > +             // Skip messages until we get to the first message.
>> > +                     if( !result.getMessageId().equals 
>> (firstMessageId) )
>> > +                             return null;
>> > +                     firstMessageId = null;
>> > +             }
>> > +             if( lastMessageId != null ) {
>> > +                     if( result.getMessageId().equals 
>> (lastMessageId) ) {
>> > +                             empty=true;
>> > +                     }
>> > +             }
>> > +            result.setRegionDestination(regionDestination);
>> > +            return result;
>> > +        }
>> >      }
>> >
>> >      public void reset(){
>> > @@ -130,13 +162,7 @@
>> >
>> >      // implementation
>> >      protected void fillBatch() throws Exception{
>> > -        store.recoverNextMessages(clientId,subscriberName,
>> > -                maxBatchSize,this);
>> > -        // this will add more messages to the batch list
>> > -        if(!batchList.isEmpty()){
>> > -            Message message=(Message)batchList.getLast();
>> > -
>> > -        }
>> > +        store.recoverNextMessages
>> > (clientId,subscriberName,maxBatchSize,this);
>> >      }
>> >
>> >      public void gc() {
>> >
>> >
>>
>>
>
>
> -- 
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com


Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java

Posted by Hiram Chirino <hi...@hiramchirino.com>.
On 12/31/06, Rob Davies <ra...@gmail.com> wrote:
> Hey Hiram,
>
> this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,
> oorg.apache.activemq.broker.BrokerTest, etc   for me.
>

yeah I think I have fix for that. sorry I broke it.  I'm running the
test suite again now.  Basically I think I need to default boolean
empty=false;  So that an initial recovery of subscription is done.

> also - I'm not sure I like TopicStorePrefetch possibly returning null
> when a hasNext() has returned true
>

Yeah me neither :)  I did not fully understand why it was returning
null when I expected it to return a value.  I was thinking it could be
a timing issue with the MessageStore.

> What was the problem in CursorDurableTest ? I hadn't seen that one
>

CursorDurableTest had a test that was failing due to out of
order/duplicates showing up.  This was cause sometimes some messages
were direct dispatched and at other times they are dispatched from the
pending list.  But since the pending list's .next() was returning the
items that were directly dispatched and not even added to the pending
list.  This is when the dups and out of order issues would show up.

The problem is that TopicStorePrefetch.next() was returning everything
added to the durable subscription since it's backed by the
MessageStore.  And that's not what we want.  We only want it to return
things that are explicitly added to it since it's the pending list.


> cheers,
>
> Rob
>
> On 30 Dec 2006, at 23:49, chirino@apache.org wrote:
>
> > Author: chirino
> > Date: Sat Dec 30 15:49:03 2006
> > New Revision: 491346
> >
> > URL: http://svn.apache.org/viewvc?view=rev&rev=491346
> > Log:
> > Fix for CursorDurableTest.
> > The TopicStorePrefetch was iterating items that were in the
> > subscription but not added to the pending list.
> >
> > Modified:
> >     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/PrefetchSubscription.java
> >     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/cursors/TopicStorePrefetch.java
> >
> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> > apache/activemq/broker/region/PrefetchSubscription.java
> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-
> > core/src/main/java/org/apache/activemq/broker/region/
> > PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
> > ======================================================================
> > ========
> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/PrefetchSubscription.java (original)
> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/PrefetchSubscription.java Sat Dec 30
> > 15:49:03 2006
> > @@ -406,7 +406,9 @@
> >                              pending.reset();
> >                              while(pending.hasNext()&&!isFull()
> > &&count<numberToDispatch){
> >                                  MessageReference node=pending.next();
> > -
> > +                                if ( node == null )
> > +                                     break;
> > +
> >                                  if(canDispatch(node)){
> >                                      pending.remove();
> >                                      // Message may have been
> > sitting in the pending list a while
> >
> > Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/
> > apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> > URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-
> > core/src/main/java/org/apache/activemq/broker/region/cursors/
> > TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
> > ======================================================================
> > ========
> > --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/cursors/TopicStorePrefetch.java (original)
> > +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/
> > activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30
> > 15:49:03 2006
> > @@ -20,7 +20,7 @@
> >
> >  import java.io.IOException;
> >  import java.util.LinkedList;
> > -import javax.jms.JMSException;
> > +
> >  import org.apache.activemq.broker.region.Destination;
> >  import org.apache.activemq.broker.region.MessageReference;
> >  import org.apache.activemq.broker.region.Topic;
> > @@ -48,6 +48,10 @@
> >      private String subscriberName;
> >      private Destination regionDestination;
> >
> > +    boolean empty=true;
> > +     private MessageId firstMessageId;
> > +     private MessageId lastMessageId;
> > +
> >      /**
> >       * @param topic
> >       * @param clientId
> > @@ -73,7 +77,7 @@
> >       * @return true if there are no pending messages
> >       */
> >      public boolean isEmpty(){
> > -        return batchList.isEmpty();
> > +        return empty;
> >      }
> >
> >      public synchronized int size(){
> > @@ -86,27 +90,55 @@
> >      }
> >
> >      public synchronized void addMessageLast(MessageReference node)
> > throws Exception{
> > -        if(node!=null){
> > +             if(node!=null){
> > +                     if( empty ) {
> > +                             firstMessageId = node.getMessageId();
> > +                             empty=false;
> > +                     }
> > +             lastMessageId = node.getMessageId();
> >              node.decrementReferenceCount();
> >          }
> >      }
> >
> > -    public synchronized boolean hasNext(){
> > -        if(isEmpty()){
> > -            try{
> > -                fillBatch();
> > -            }catch(Exception e){
> > -                log.error("Failed to fill batch",e);
> > -                throw new RuntimeException(e);
> > -            }
> > -        }
> > +    public synchronized boolean hasNext() {
> >          return !isEmpty();
> >      }
> >
> >      public synchronized MessageReference next(){
> > -        Message result = (Message)batchList.removeFirst();
> > -        result.setRegionDestination(regionDestination);
> > -        return result;
> > +
> > +        if( empty ) {
> > +             return null;
> > +        } else {
> > +
> > +             // We may need to fill in the batch...
> > +            if(batchList.isEmpty()){
> > +                try{
> > +                    fillBatch();
> > +                }catch(Exception e){
> > +                    log.error("Failed to fill batch",e);
> > +                    throw new RuntimeException(e);
> > +                }
> > +                if( batchList.isEmpty()) {
> > +                     return null;
> > +                }
> > +            }
> > +
> > +            Message result = (Message)batchList.removeFirst();
> > +
> > +             if( firstMessageId != null ) {
> > +             // Skip messages until we get to the first message.
> > +                     if( !result.getMessageId().equals(firstMessageId) )
> > +                             return null;
> > +                     firstMessageId = null;
> > +             }
> > +             if( lastMessageId != null ) {
> > +                     if( result.getMessageId().equals(lastMessageId) ) {
> > +                             empty=true;
> > +                     }
> > +             }
> > +            result.setRegionDestination(regionDestination);
> > +            return result;
> > +        }
> >      }
> >
> >      public void reset(){
> > @@ -130,13 +162,7 @@
> >
> >      // implementation
> >      protected void fillBatch() throws Exception{
> > -        store.recoverNextMessages(clientId,subscriberName,
> > -                maxBatchSize,this);
> > -        // this will add more messages to the batch list
> > -        if(!batchList.isEmpty()){
> > -            Message message=(Message)batchList.getLast();
> > -
> > -        }
> > +        store.recoverNextMessages
> > (clientId,subscriberName,maxBatchSize,this);
> >      }
> >
> >      public void gc() {
> >
> >
>
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com

Re: svn commit: r491346 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java cursors/TopicStorePrefetch.java

Posted by Rob Davies <ra...@gmail.com>.
Hey Hiram,

this change breaks  org.apache.activemq.broker.RecoveryBrokerTest,  
oorg.apache.activemq.broker.BrokerTest, etc   for me.

also - I'm not sure I like TopicStorePrefetch possibly returning null  
when a hasNext() has returned true

What was the problem in CursorDurableTest ? I hadn't seen that one

cheers,

Rob

On 30 Dec 2006, at 23:49, chirino@apache.org wrote:

> Author: chirino
> Date: Sat Dec 30 15:49:03 2006
> New Revision: 491346
>
> URL: http://svn.apache.org/viewvc?view=rev&rev=491346
> Log:
> Fix for CursorDurableTest.
> The TopicStorePrefetch was iterating items that were in the  
> subscription but not added to the pending list.
>
> Modified:
>     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/PrefetchSubscription.java
>     incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/cursors/TopicStorePrefetch.java
>
> Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ 
> apache/activemq/broker/region/PrefetchSubscription.java
> URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq- 
> core/src/main/java/org/apache/activemq/broker/region/ 
> PrefetchSubscription.java?view=diff&rev=491346&r1=491345&r2=491346
> ====================================================================== 
> ========
> --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/PrefetchSubscription.java (original)
> +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/PrefetchSubscription.java Sat Dec 30  
> 15:49:03 2006
> @@ -406,7 +406,9 @@
>                              pending.reset();
>                              while(pending.hasNext()&&!isFull() 
> &&count<numberToDispatch){
>                                  MessageReference node=pending.next();
> -
> +                                if ( node == null )
> +                                	break;
> +
>                                  if(canDispatch(node)){
>                                      pending.remove();
>                                      // Message may have been  
> sitting in the pending list a while
>
> Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/ 
> apache/activemq/broker/region/cursors/TopicStorePrefetch.java
> URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq- 
> core/src/main/java/org/apache/activemq/broker/region/cursors/ 
> TopicStorePrefetch.java?view=diff&rev=491346&r1=491345&r2=491346
> ====================================================================== 
> ========
> --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/cursors/TopicStorePrefetch.java (original)
> +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/ 
> activemq/broker/region/cursors/TopicStorePrefetch.java Sat Dec 30  
> 15:49:03 2006
> @@ -20,7 +20,7 @@
>
>  import java.io.IOException;
>  import java.util.LinkedList;
> -import javax.jms.JMSException;
> +
>  import org.apache.activemq.broker.region.Destination;
>  import org.apache.activemq.broker.region.MessageReference;
>  import org.apache.activemq.broker.region.Topic;
> @@ -48,6 +48,10 @@
>      private String subscriberName;
>      private Destination regionDestination;
>
> +    boolean empty=true;
> +	private MessageId firstMessageId;
> +	private MessageId lastMessageId;
> +
>      /**
>       * @param topic
>       * @param clientId
> @@ -73,7 +77,7 @@
>       * @return true if there are no pending messages
>       */
>      public boolean isEmpty(){
> -        return batchList.isEmpty();
> +        return empty;
>      }
>
>      public synchronized int size(){
> @@ -86,27 +90,55 @@
>      }
>
>      public synchronized void addMessageLast(MessageReference node)  
> throws Exception{
> -        if(node!=null){
> +		if(node!=null){
> +			if( empty ) {
> +				firstMessageId = node.getMessageId();
> +				empty=false;
> +			}
> +	        lastMessageId = node.getMessageId();
>              node.decrementReferenceCount();
>          }
>      }
>
> -    public synchronized boolean hasNext(){
> -        if(isEmpty()){
> -            try{
> -                fillBatch();
> -            }catch(Exception e){
> -                log.error("Failed to fill batch",e);
> -                throw new RuntimeException(e);
> -            }
> -        }
> +    public synchronized boolean hasNext() {
>          return !isEmpty();
>      }
>
>      public synchronized MessageReference next(){
> -        Message result = (Message)batchList.removeFirst();
> -        result.setRegionDestination(regionDestination);
> -        return result;
> +    	    	
> +        if( empty ) {
> +        	return null;
> +        } else {
> +
> +        	// We may need to fill in the batch...
> +            if(batchList.isEmpty()){
> +                try{
> +                    fillBatch();
> +                }catch(Exception e){
> +                    log.error("Failed to fill batch",e);
> +                    throw new RuntimeException(e);
> +                }
> +                if( batchList.isEmpty()) {
> +                	return null;
> +                }
> +            }
> +
> +            Message result = (Message)batchList.removeFirst();
> +        	
> +        	if( firstMessageId != null ) {
> +            	// Skip messages until we get to the first message.
> +        		if( !result.getMessageId().equals(firstMessageId) )
> +        			return null;
> +        		firstMessageId = null;
> +        	}
> +        	if( lastMessageId != null ) {
> +        		if( result.getMessageId().equals(lastMessageId) ) {
> +        			empty=true;
> +        		}
> +        	}        	
> +            result.setRegionDestination(regionDestination);
> +            return result;
> +        }
>      }
>
>      public void reset(){
> @@ -130,13 +162,7 @@
>
>      // implementation
>      protected void fillBatch() throws Exception{
> -        store.recoverNextMessages(clientId,subscriberName,
> -                maxBatchSize,this);
> -        // this will add more messages to the batch list
> -        if(!batchList.isEmpty()){
> -            Message message=(Message)batchList.getLast();
> -
> -        }
> +        store.recoverNextMessages 
> (clientId,subscriberName,maxBatchSize,this);
>      }
>
>      public void gc() {
>
>