You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/10/02 19:30:23 UTC

svn commit: r821106 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ activemq-pool/src/test/java/org/apache/activemq/usecases/ activemq-pool/src/test/resources/

Author: dejanb
Date: Fri Oct  2 17:30:23 2009
New Revision: 821106

URL: http://svn.apache.org/viewvc?rev=821106&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2437 - jdbc recovery

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
    activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
    activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Oct  2 17:30:23 2009
@@ -738,6 +738,7 @@
                         this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
+                        break;
                     }
                 }
             } else {
@@ -753,6 +754,7 @@
                         this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
+                        break;
                     }
                 }
             }

Modified: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java (original)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java Fri Oct  2 17:30:23 2009
@@ -40,6 +40,7 @@
 	private boolean running;
 	private Log log = LogFactory.getLog(ConsumerThread.class);
 	private int numberOfQueues;
+	private String consumerName;
 	
 	@Override
 	public void run() {		
@@ -69,7 +70,7 @@
 	}
 
 	private DefaultMessageListenerContainer createContainer() {
-		Random generator = new Random();
+		Random generator = new Random(consumerName.hashCode());
 		int queueSuffix = generator.nextInt(numberOfQueues);
 		
 		
@@ -94,6 +95,7 @@
 		}
 		*/
 		container.afterPropertiesSet();
+		log.info("subscribing to " + destination + queueSuffix);
 		return container;
 	}
 		
@@ -125,6 +127,11 @@
 	public int getNumberOfQueues() {
 		return this.numberOfQueues;
 	}
+	
+
+	public void setConsumerName(String name) {
+		this.consumerName = name;
+	}
 
 	/**
 	 * @param connectionFactory the connectionFactory to set

Modified: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java (original)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java Fri Oct  2 17:30:23 2009
@@ -36,9 +36,9 @@
 	
 	private static Log log = LogFactory.getLog(JDBCSpringTest.class);
 	
-	  int numberOfConsumerThreads = 50;
-      int numberOfProducerThreads = 50;
-      int numberOfMessages = 100;
+      int numberOfConsumerThreads = 20;
+      int numberOfProducerThreads = 20;
+      int numberOfMessages = 50;
       int numberOfQueues = 5;
       String url = "tcp://localhost:61616";
 	
@@ -46,7 +46,6 @@
       
     public void setUp() throws Exception {
     	broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
-    	//broker.deleteAllMessages();
     	broker.start();
     	broker.waitUntilStarted();
     }
@@ -87,7 +86,7 @@
 			thread.setMessage(twoKbMessage);
 			thread.setNumberOfMessagesToSend(numberOfMessages);
 			thread.setNumberOfQueues(numberOfQueues);
-			thread.setQueuePrefix("DEV-1786.queue.");
+			thread.setQueuePrefix("AMQ-2436.queue.");
 			thread.setConnectionFactory(connectionFactory);
 			thread.setSendDelay(100);
 			ProducerThreads.add(thread);
@@ -100,10 +99,11 @@
 			thread.setMessageDrivenPojo(mdp1);
 			thread.setConcurrentConsumers(1);
 			thread.setConnectionFactory(connectionFactory);
-			thread.setDestination("DEV-1786.queue.");
+			thread.setDestination("AMQ-2436.queue.");
 			thread.setPubSubDomain(false);
 			thread.setSessionTransacted(true);
 			thread.setNumberOfQueues(numberOfQueues);
+			thread.setConsumerName("consumer" + i);
 			ConsumerThreads.add(thread);
 			thread.start();
 			
@@ -121,6 +121,7 @@
 		
 		boolean finished = false;	
 		int retry = 0;
+		int previous = 0;
 		while (!finished) {
                     
 			int totalMessages = 0;	
@@ -128,8 +129,17 @@
 			for (Thread thread : ConsumerThreads) {
 				totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
 			}
+			log.info(totalMessages + " received so far...");
+			if (totalMessages != 0 && previous == totalMessages) {
+				for (Thread thread : ConsumerThreads) {
+					((ConsumerThread)thread).setRun(false);
+				}
+				Thread.sleep(3000);
+				fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
+			}
+			previous = totalMessages;
 			
-			if (totalMessages == (numberOfMessages * numberOfProducerThreads)) {
+			if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
 				finished = true;
 				log.info("Received all " + totalMessages + " messages. Finishing.");
 				
@@ -141,11 +151,7 @@
 				}
 
 			} else {
-				if (retry == 10) {
-					fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
-				}
 				Thread.sleep(10000);
-				log.info(totalMessages + " received so far...");
 			}
 		}
 	}	

Modified: activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml (original)
+++ activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml Fri Oct  2 17:30:23 2009
@@ -23,16 +23,27 @@
   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
   http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
 
-    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false">
+    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false" deleteAllMessagesOnStartup="true">
          
         <persistenceAdapter>
-            <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="traget/"/>
+            <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="target/"/>
         </persistenceAdapter>
         
+        <destinationPolicy>
+            <policyMap>
+                <policyEntries>
+                    <policyEntry queue=">" memoryLimit="10240"/>
+                    <policyEntry topic=">" memoryLimit="10240">
+                    </policyEntry>
+                </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+        
+        
         <systemUsage>
             <systemUsage>
                 <memoryUsage>
-                    <memoryUsage limit="20 mb"/>
+                    <memoryUsage limit="102400"/>
                 </memoryUsage>
                 <storeUsage>
                     <storeUsage limit="1 gb" name="foo"/>