You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/10/02 19:50:55 UTC

svn commit: r821112 - in /activemq/branches/activemq-5.3: activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/ activemq-pool/ activemq-pool/src/test/java/org/apache/activemq/usecases/ activemq-pool/src/test/resources/

Author: dejanb
Date: Fri Oct  2 17:50:55 2009
New Revision: 821112

URL: http://svn.apache.org/viewvc?rev=821112&view=rev
Log:
merging 820268,821106 - jdbc recovery

Added:
    activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
      - copied, changed from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
    activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
      - copied, changed from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
    activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/MessageDrivenPojo.java
      - copied unchanged from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/MessageDrivenPojo.java
    activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ProducerThread.java
      - copied unchanged from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ProducerThread.java
    activemq/branches/activemq-5.3/activemq-pool/src/test/resources/
      - copied from r820268, activemq/trunk/activemq-pool/src/test/resources/
    activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml
      - copied, changed from r820268, activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml
    activemq/branches/activemq-5.3/activemq-pool/src/test/resources/log4j.properties
      - copied unchanged from r820268, activemq/trunk/activemq-pool/src/test/resources/log4j.properties
Modified:
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
    activemq/branches/activemq-5.3/activemq-pool/pom.xml

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=821112&r1=821111&r2=821112&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Oct  2 17:50:55 2009
@@ -22,13 +22,11 @@
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
-
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -57,7 +55,7 @@
     private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
     protected Statements statements;
     protected boolean batchStatments = true;
-    private Set<Long> lastRecoveredMessagesIds = Collections.synchronizedSet(new TreeSet<Long>());
+    private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
 
     protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
         s.setBytes(index, data);
@@ -738,6 +736,7 @@
                         this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
+                        break;
                     }
                 }
             } else {
@@ -753,6 +752,7 @@
                         this.lastRecoveredMessagesIds.add(id);
                     } else {
                         LOG.debug("Stopped recover next messages");
+                        break;
                     }
                 }
             }

Modified: activemq/branches/activemq-5.3/activemq-pool/pom.xml
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/pom.xml?rev=821112&r1=821111&r2=821112&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-pool/pom.xml (original)
+++ activemq/branches/activemq-5.3/activemq-pool/pom.xml Fri Oct  2 17:50:55 2009
@@ -94,6 +94,21 @@
       <artifactId>spring-jms</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+    	<groupId>org.apache.xbean</groupId>
+    	<artifactId>xbean-spring</artifactId>
+    	<scope>test</scope>
+    </dependency>
+    <dependency>
+    	<groupId>log4j</groupId>
+    	<artifactId>log4j</artifactId>
+    	<scope>test</scope>
+    </dependency>
+    <dependency>
+    	<groupId>org.apache.derby</groupId>
+    	<artifactId>derby</artifactId>
+    	<scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>

Copied: activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java (from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java?p2=activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java&p1=activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java&r1=820268&r2=821112&rev=821112&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java (original)
+++ activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java Fri Oct  2 17:50:55 2009
@@ -40,6 +40,7 @@
 	private boolean running;
 	private Log log = LogFactory.getLog(ConsumerThread.class);
 	private int numberOfQueues;
+	private String consumerName;
 	
 	@Override
 	public void run() {		
@@ -69,7 +70,7 @@
 	}
 
 	private DefaultMessageListenerContainer createContainer() {
-		Random generator = new Random();
+		Random generator = new Random(consumerName.hashCode());
 		int queueSuffix = generator.nextInt(numberOfQueues);
 		
 		
@@ -94,6 +95,7 @@
 		}
 		*/
 		container.afterPropertiesSet();
+		log.info("subscribing to " + destination + queueSuffix);
 		return container;
 	}
 		
@@ -125,6 +127,11 @@
 	public int getNumberOfQueues() {
 		return this.numberOfQueues;
 	}
+	
+
+	public void setConsumerName(String name) {
+		this.consumerName = name;
+	}
 
 	/**
 	 * @param connectionFactory the connectionFactory to set

Copied: activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java (from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java?p2=activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java&p1=activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java&r1=820268&r2=821112&rev=821112&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java Fri Oct  2 17:50:55 2009
@@ -36,9 +36,9 @@
 	
 	private static Log log = LogFactory.getLog(JDBCSpringTest.class);
 	
-	  int numberOfConsumerThreads = 50;
-      int numberOfProducerThreads = 50;
-      int numberOfMessages = 100;
+      int numberOfConsumerThreads = 20;
+      int numberOfProducerThreads = 20;
+      int numberOfMessages = 50;
       int numberOfQueues = 5;
       String url = "tcp://localhost:61616";
 	
@@ -46,7 +46,6 @@
       
     public void setUp() throws Exception {
     	broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
-    	//broker.deleteAllMessages();
     	broker.start();
     	broker.waitUntilStarted();
     }
@@ -87,7 +86,7 @@
 			thread.setMessage(twoKbMessage);
 			thread.setNumberOfMessagesToSend(numberOfMessages);
 			thread.setNumberOfQueues(numberOfQueues);
-			thread.setQueuePrefix("DEV-1786.queue.");
+			thread.setQueuePrefix("AMQ-2436.queue.");
 			thread.setConnectionFactory(connectionFactory);
 			thread.setSendDelay(100);
 			ProducerThreads.add(thread);
@@ -100,10 +99,11 @@
 			thread.setMessageDrivenPojo(mdp1);
 			thread.setConcurrentConsumers(1);
 			thread.setConnectionFactory(connectionFactory);
-			thread.setDestination("DEV-1786.queue.");
+			thread.setDestination("AMQ-2436.queue.");
 			thread.setPubSubDomain(false);
 			thread.setSessionTransacted(true);
 			thread.setNumberOfQueues(numberOfQueues);
+			thread.setConsumerName("consumer" + i);
 			ConsumerThreads.add(thread);
 			thread.start();
 			
@@ -121,6 +121,7 @@
 		
 		boolean finished = false;	
 		int retry = 0;
+		int previous = 0;
 		while (!finished) {
                     
 			int totalMessages = 0;	
@@ -128,8 +129,17 @@
 			for (Thread thread : ConsumerThreads) {
 				totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
 			}
+			log.info(totalMessages + " received so far...");
+			if (totalMessages != 0 && previous == totalMessages) {
+				for (Thread thread : ConsumerThreads) {
+					((ConsumerThread)thread).setRun(false);
+				}
+				Thread.sleep(3000);
+				fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
+			}
+			previous = totalMessages;
 			
-			if (totalMessages == (numberOfMessages * numberOfProducerThreads)) {
+			if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
 				finished = true;
 				log.info("Received all " + totalMessages + " messages. Finishing.");
 				
@@ -141,11 +151,7 @@
 				}
 
 			} else {
-				if (retry == 10) {
-					fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
-				}
 				Thread.sleep(10000);
-				log.info(totalMessages + " received so far...");
 			}
 		}
 	}	

Copied: activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml (from r820268, activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml?p2=activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml&p1=activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml&r1=820268&r2=821112&rev=821112&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml (original)
+++ activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml Fri Oct  2 17:50:55 2009
@@ -23,16 +23,27 @@
   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd   
   http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
 
-    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false">
+    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false" deleteAllMessagesOnStartup="true">
          
         <persistenceAdapter>
-            <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="traget/"/>
+            <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="target/"/>
         </persistenceAdapter>
         
+        <destinationPolicy>
+            <policyMap>
+                <policyEntries>
+                    <policyEntry queue=">" memoryLimit="10240"/>
+                    <policyEntry topic=">" memoryLimit="10240">
+                    </policyEntry>
+                </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+        
+        
         <systemUsage>
             <systemUsage>
                 <memoryUsage>
-                    <memoryUsage limit="20 mb"/>
+                    <memoryUsage limit="102400"/>
                 </memoryUsage>
                 <storeUsage>
                     <storeUsage limit="1 gb" name="foo"/>