You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/09 21:38:18 UTC
svn commit: r454471 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/cursors/
test/java/org/apache/activemq/perf/
Author: rajdavies
Date: Mon Oct 9 12:38:17 2006
New Revision: 454471
URL: http://svn.apache.org/viewvc?view=rev&rev=454471
Log:
Ensure messages inform usage manager when they hop in and out of ram ...
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?view=diff&rev=454471&r1=454470&r2=454471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct 9 12:38:17 2006
@@ -134,23 +134,26 @@
}
public synchronized void addMessageLast(MessageReference node) throws Exception{
- if(started){
- if(node!=null){
- Message msg=node.getMessage();
+ if(node!=null){
+ Message msg=node.getMessage();
+ if(started){
+ pendingCount++;
if(!msg.isPersistent()){
nonPersistent.addMessageLast(node);
- }else{
- Destination dest=msg.getRegionDestination();
- TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
- if(tsp!=null){
- tsp.addMessageLast(node);
+ }
+ }
+ if(msg.isPersistent()){
+ Destination dest=msg.getRegionDestination();
+ TopicStorePrefetch tsp=(TopicStorePrefetch)topics.get(dest);
+ if(tsp!=null){
+ tsp.addMessageLast(node);
+ if(started){
// if the store has been empty - then this message is next to dispatch
if((pendingCount-nonPersistent.size())<=0){
tsp.nextToDispatch(node.getMessageId());
}
}
}
- pendingCount++;
}
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?view=diff&rev=454471&r1=454470&r2=454471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct 9 12:38:17 2006
@@ -84,6 +84,12 @@
throw new RuntimeException(e);
}
}
+
+ public synchronized void addMessageLast(MessageReference node) throws Exception{
+ if(node!=null){
+ node.decrementReferenceCount();
+ }
+ }
public synchronized boolean hasNext(){
if(isEmpty()){
@@ -112,6 +118,7 @@
public void recoverMessage(Message message) throws Exception{
message.setRegionDestination(regionDestination);
+ message.incrementReferenceCount();
batchList.addLast(message);
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java?view=diff&rev=454471&r1=454470&r2=454471
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java Mon Oct 9 12:38:17 2006
@@ -34,6 +34,7 @@
* @version $Revision$
*/
public class InactiveDurableTopicTest extends TestCase{
+ private static final int MESSAGE_COUNT = 100000;
private static final String DEFAULT_PASSWORD="";
private static final String USERNAME="testuser";
private static final String CLIENTID="mytestclient";
@@ -53,7 +54,7 @@
super.setUp();
broker=new BrokerService();
- //broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+ broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
/*
DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
factory.setDataDirectoryFile(broker.getDataDirectory());
@@ -84,6 +85,7 @@
connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
assertNotNull(connection);
connection.setClientID(CLIENTID);
+ connection.start();
session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
assertNotNull(session);
topic=session.createTopic(TOPIC_NAME);
@@ -119,18 +121,51 @@
assertNotNull(msg);
msg.setString("key1","value1");
int loop;
- for(loop=0;loop<100000;loop++){
+ for(loop=0;loop<MESSAGE_COUNT;loop++){
msg.setInt("key2",loop);
publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
if (loop%500==0){
System.out.println("Sent " + loop + " messages");
}
}
- this.assertEquals(loop,100000);
+ this.assertEquals(loop,MESSAGE_COUNT);
publisher.close();
session.close();
connection.stop();
connection.stop();
+ }catch(JMSException ex){
+ try{
+ connection.close();
+ }catch(Exception ignore){}
+ throw new AssertionFailedError("Create Subscription caught: "+ex);
+ }
+ }
+ public void test3CreateSubscription() throws Exception{
+ try{
+ /*
+ * Step 1 - Establish a connection with a client id and create a durable subscription
+ */
+ connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
+ assertNotNull(connection);
+ connection.setClientID(CLIENTID);
+ connection.start();
+ session=connection.createSession(false,javax.jms.Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ topic=session.createTopic(TOPIC_NAME);
+ assertNotNull(topic);
+ subscriber=session.createDurableSubscriber(topic,SUBID,"",false);
+ assertNotNull(subscriber);
+ int loop;
+ for(loop=0;loop<MESSAGE_COUNT;loop++){
+ Message msg = subscriber.receive();
+ if (loop%500==0){
+ System.out.println("Received " + loop + " messages");
+ }
+ }
+ this.assertEquals(loop,MESSAGE_COUNT);
+ subscriber.close();
+ session.close();
+ connection.close();
}catch(JMSException ex){
try{
connection.close();