You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/10/02 19:50:55 UTC
svn commit: r821112 - in /activemq/branches/activemq-5.3:
activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/
activemq-pool/ activemq-pool/src/test/java/org/apache/activemq/usecases/
activemq-pool/src/test/resources/
Author: dejanb
Date: Fri Oct 2 17:50:55 2009
New Revision: 821112
URL: http://svn.apache.org/viewvc?rev=821112&view=rev
Log:
merging 820268,821106 - jdbc recovery
Added:
activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
- copied, changed from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
- copied, changed from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/MessageDrivenPojo.java
- copied unchanged from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/MessageDrivenPojo.java
activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ProducerThread.java
- copied unchanged from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ProducerThread.java
activemq/branches/activemq-5.3/activemq-pool/src/test/resources/
- copied from r820268, activemq/trunk/activemq-pool/src/test/resources/
activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml
- copied, changed from r820268, activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml
activemq/branches/activemq-5.3/activemq-pool/src/test/resources/log4j.properties
- copied unchanged from r820268, activemq/trunk/activemq-pool/src/test/resources/log4j.properties
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/branches/activemq-5.3/activemq-pool/pom.xml
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=821112&r1=821111&r2=821112&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Oct 2 17:50:55 2009
@@ -22,13 +22,11 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
@@ -57,7 +55,7 @@
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements;
protected boolean batchStatments = true;
- private Set<Long> lastRecoveredMessagesIds = Collections.synchronizedSet(new TreeSet<Long>());
+ private Set<Long> lastRecoveredMessagesIds = new TreeSet<Long>();
protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
s.setBytes(index, data);
@@ -738,6 +736,7 @@
this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
+ break;
}
}
} else {
@@ -753,6 +752,7 @@
this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
+ break;
}
}
}
Modified: activemq/branches/activemq-5.3/activemq-pool/pom.xml
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/pom.xml?rev=821112&r1=821111&r2=821112&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-pool/pom.xml (original)
+++ activemq/branches/activemq-5.3/activemq-pool/pom.xml Fri Oct 2 17:50:55 2009
@@ -94,6 +94,21 @@
<artifactId>spring-jms</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
Copied: activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java (from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java?p2=activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java&p1=activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java&r1=820268&r2=821112&rev=821112&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java (original)
+++ activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java Fri Oct 2 17:50:55 2009
@@ -40,6 +40,7 @@
private boolean running;
private Log log = LogFactory.getLog(ConsumerThread.class);
private int numberOfQueues;
+ private String consumerName;
@Override
public void run() {
@@ -69,7 +70,7 @@
}
private DefaultMessageListenerContainer createContainer() {
- Random generator = new Random();
+ Random generator = new Random(consumerName.hashCode());
int queueSuffix = generator.nextInt(numberOfQueues);
@@ -94,6 +95,7 @@
}
*/
container.afterPropertiesSet();
+ log.info("subscribing to " + destination + queueSuffix);
return container;
}
@@ -125,6 +127,11 @@
public int getNumberOfQueues() {
return this.numberOfQueues;
}
+
+
+ public void setConsumerName(String name) {
+ this.consumerName = name;
+ }
/**
* @param connectionFactory the connectionFactory to set
Copied: activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java (from r820268, activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java?p2=activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java&p1=activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java&r1=820268&r2=821112&rev=821112&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java Fri Oct 2 17:50:55 2009
@@ -36,9 +36,9 @@
private static Log log = LogFactory.getLog(JDBCSpringTest.class);
- int numberOfConsumerThreads = 50;
- int numberOfProducerThreads = 50;
- int numberOfMessages = 100;
+ int numberOfConsumerThreads = 20;
+ int numberOfProducerThreads = 20;
+ int numberOfMessages = 50;
int numberOfQueues = 5;
String url = "tcp://localhost:61616";
@@ -46,7 +46,6 @@
public void setUp() throws Exception {
broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
- //broker.deleteAllMessages();
broker.start();
broker.waitUntilStarted();
}
@@ -87,7 +86,7 @@
thread.setMessage(twoKbMessage);
thread.setNumberOfMessagesToSend(numberOfMessages);
thread.setNumberOfQueues(numberOfQueues);
- thread.setQueuePrefix("DEV-1786.queue.");
+ thread.setQueuePrefix("AMQ-2436.queue.");
thread.setConnectionFactory(connectionFactory);
thread.setSendDelay(100);
ProducerThreads.add(thread);
@@ -100,10 +99,11 @@
thread.setMessageDrivenPojo(mdp1);
thread.setConcurrentConsumers(1);
thread.setConnectionFactory(connectionFactory);
- thread.setDestination("DEV-1786.queue.");
+ thread.setDestination("AMQ-2436.queue.");
thread.setPubSubDomain(false);
thread.setSessionTransacted(true);
thread.setNumberOfQueues(numberOfQueues);
+ thread.setConsumerName("consumer" + i);
ConsumerThreads.add(thread);
thread.start();
@@ -121,6 +121,7 @@
boolean finished = false;
int retry = 0;
+ int previous = 0;
while (!finished) {
int totalMessages = 0;
@@ -128,8 +129,17 @@
for (Thread thread : ConsumerThreads) {
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
}
+ log.info(totalMessages + " received so far...");
+ if (totalMessages != 0 && previous == totalMessages) {
+ for (Thread thread : ConsumerThreads) {
+ ((ConsumerThread)thread).setRun(false);
+ }
+ Thread.sleep(3000);
+ fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
+ }
+ previous = totalMessages;
- if (totalMessages == (numberOfMessages * numberOfProducerThreads)) {
+ if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
finished = true;
log.info("Received all " + totalMessages + " messages. Finishing.");
@@ -141,11 +151,7 @@
}
} else {
- if (retry == 10) {
- fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
- }
Thread.sleep(10000);
- log.info(totalMessages + " received so far...");
}
}
}
Copied: activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml (from r820268, activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml)
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml?p2=activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml&p1=activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml&r1=820268&r2=821112&rev=821112&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml (original)
+++ activemq/branches/activemq-5.3/activemq-pool/src/test/resources/activemq-spring-jdbc.xml Fri Oct 2 17:50:55 2009
@@ -23,16 +23,27 @@
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
- <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false">
+ <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false" deleteAllMessagesOnStartup="true">
<persistenceAdapter>
- <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="traget/"/>
+ <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="target/"/>
</persistenceAdapter>
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue=">" memoryLimit="10240"/>
+ <policyEntry topic=">" memoryLimit="10240">
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+
<systemUsage>
<systemUsage>
<memoryUsage>
- <memoryUsage limit="20 mb"/>
+ <memoryUsage limit="102400"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>