You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/09 21:38:18 UTC

svn commit: r454471 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/perf/

Author: rajdavies
Date: Mon Oct  9 12:38:17 2006
New Revision: 454471

URL: http://svn.apache.org/viewvc?view=rev&rev=454471
Log:
Ensure messages inform usage manager when they hop in and out of ram ...

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=454471&r1=454470&r2=454471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct  9 12:38:17 2006
@@ -134,23 +134,26 @@
     }
 
     public synchronized void addMessageLast(MessageReference node) throws Exception{
-        if(started){
-            if(node!=null){
-                Message msg=node.getMessage();
+        if(node!=null){
+            Message msg=node.getMessage();
+            if(started){
+                pendingCount++;
                 if(!msg.isPersistent()){
                     nonPersistent.addMessageLast(node);
-                }else{
-                    Destination dest=msg.getRegionDestination();
-                    TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
-                    if(tsp!=null){
-                        tsp.addMessageLast(node);
+                }
+            }
+            if(msg.isPersistent()){
+                Destination dest=msg.getRegionDestination();
+                TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
+                if(tsp!=null){
+                    tsp.addMessageLast(node);
+                    if(started){
                         // if the store has been empty - then this message is next to dispatch
                         if((pendingCount-nonPersistent.size())<=0){
                             tsp.nextToDispatch(node.getMessageId());
                         }
                     }
                 }
-                pendingCount++;
             }
         }
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=454471&r1=454470&r2=454471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct  9 12:38:17 2006
@@ -84,6 +84,12 @@
             throw new RuntimeException(e);
         }
     }
+    
+    public synchronized void addMessageLast(MessageReference node) throws Exception{
+        if(node!=null){
+            node.decrementReferenceCount();
+        }
+    }
 
     public synchronized boolean hasNext(){
         if(isEmpty()){
@@ -112,6 +118,7 @@
 
     public void recoverMessage(Message message) throws Exception{
         message.setRegionDestination(regionDestination);
+        message.incrementReferenceCount();
         batchList.addLast(message);
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java?view=diff&rev=454471&r1=454470&r2=454471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java Mon Oct  9 12:38:17 2006
@@ -34,6 +34,7 @@
  * @version $Revision$
  */
 public class InactiveDurableTopicTest extends TestCase{
+    private static final int MESSAGE_COUNT = 100000;
     private static final String DEFAULT_PASSWORD="";
     private static final String USERNAME="testuser";
     private static final String CLIENTID="mytestclient";
@@ -53,7 +54,7 @@
         super.setUp();
         broker=new BrokerService();
         
-        //broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+        broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
         /*
         DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
         factory.setDataDirectoryFile(broker.getDataDirectory());
@@ -84,6 +85,7 @@
             connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
             assertNotNull(connection);
             connection.setClientID(CLIENTID);
+            connection.start();
             session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
             assertNotNull(session);
             topic=session.createTopic(TOPIC_NAME);
@@ -119,18 +121,51 @@
             assertNotNull(msg);
             msg.setString("key1","value1");
             int loop;
-            for(loop=0;loop<100000;loop++){
+            for(loop=0;loop<MESSAGE_COUNT;loop++){
                 msg.setInt("key2",loop);
                 publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
                 if (loop%500==0){
                     System.out.println("Sent " + loop + " messages");
                 }
             }
-            this.assertEquals(loop,100000);
+            this.assertEquals(loop,MESSAGE_COUNT);
             publisher.close();
             session.close();
             connection.stop();
             connection.stop();
+        }catch(JMSException ex){
+            try{
+                connection.close();
+            }catch(Exception ignore){}
+            throw new AssertionFailedError("Create Subscription caught: "+ex);
+        }
+    }
+    public void test3CreateSubscription() throws Exception{
+        try{
+            /*
+             * Step 1 - Establish a connection with a client id and create a durable subscription
+             */
+            connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
+            assertNotNull(connection);
+            connection.setClientID(CLIENTID);
+            connection.start();
+            session=connection.createSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE);
+            assertNotNull(session);
+            topic=session.createTopic(TOPIC_NAME);
+            assertNotNull(topic);
+            subscriber=session.createDurableSubscriber(topic,SUBID,"",false);
+            assertNotNull(subscriber);
+            int loop;
+            for(loop=0;loop<MESSAGE_COUNT;loop++){
+                Message msg = subscriber.receive();
+                if (loop%500==0){
+                    System.out.println("Received " + loop + " messages");
+                }
+            }
+            this.assertEquals(loop,MESSAGE_COUNT);
+            subscriber.close();
+            session.close();
+            connection.close();
         }catch(JMSException ex){
             try{
                 connection.close();