You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/10/02 19:30:23 UTC
svn commit: r821106 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/
activemq-pool/src/test/java/org/apache/activemq/usecases/
activemq-pool/src/test/resources/
Author: dejanb
Date: Fri Oct 2 17:30:23 2009
New Revision: 821106
URL: http://svn.apache.org/viewvc?rev=821106&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2437 - jdbc recovery
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Fri Oct 2 17:30:23 2009
@@ -738,6 +738,7 @@
this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
+ break;
}
}
} else {
@@ -753,6 +754,7 @@
this.lastRecoveredMessagesIds.add(id);
} else {
LOG.debug("Stopped recover next messages");
+ break;
}
}
}
Modified: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java (original)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/ConsumerThread.java Fri Oct 2 17:30:23 2009
@@ -40,6 +40,7 @@
private boolean running;
private Log log = LogFactory.getLog(ConsumerThread.class);
private int numberOfQueues;
+ private String consumerName;
@Override
public void run() {
@@ -69,7 +70,7 @@
}
private DefaultMessageListenerContainer createContainer() {
- Random generator = new Random();
+ Random generator = new Random(consumerName.hashCode());
int queueSuffix = generator.nextInt(numberOfQueues);
@@ -94,6 +95,7 @@
}
*/
container.afterPropertiesSet();
+ log.info("subscribing to " + destination + queueSuffix);
return container;
}
@@ -125,6 +127,11 @@
public int getNumberOfQueues() {
return this.numberOfQueues;
}
+
+
+ public void setConsumerName(String name) {
+ this.consumerName = name;
+ }
/**
* @param connectionFactory the connectionFactory to set
Modified: activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java (original)
+++ activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/usecases/JDBCSpringTest.java Fri Oct 2 17:30:23 2009
@@ -36,9 +36,9 @@
private static Log log = LogFactory.getLog(JDBCSpringTest.class);
- int numberOfConsumerThreads = 50;
- int numberOfProducerThreads = 50;
- int numberOfMessages = 100;
+ int numberOfConsumerThreads = 20;
+ int numberOfProducerThreads = 20;
+ int numberOfMessages = 50;
int numberOfQueues = 5;
String url = "tcp://localhost:61616";
@@ -46,7 +46,6 @@
public void setUp() throws Exception {
broker = BrokerFactory.createBroker("xbean:activemq-spring-jdbc.xml");
- //broker.deleteAllMessages();
broker.start();
broker.waitUntilStarted();
}
@@ -87,7 +86,7 @@
thread.setMessage(twoKbMessage);
thread.setNumberOfMessagesToSend(numberOfMessages);
thread.setNumberOfQueues(numberOfQueues);
- thread.setQueuePrefix("DEV-1786.queue.");
+ thread.setQueuePrefix("AMQ-2436.queue.");
thread.setConnectionFactory(connectionFactory);
thread.setSendDelay(100);
ProducerThreads.add(thread);
@@ -100,10 +99,11 @@
thread.setMessageDrivenPojo(mdp1);
thread.setConcurrentConsumers(1);
thread.setConnectionFactory(connectionFactory);
- thread.setDestination("DEV-1786.queue.");
+ thread.setDestination("AMQ-2436.queue.");
thread.setPubSubDomain(false);
thread.setSessionTransacted(true);
thread.setNumberOfQueues(numberOfQueues);
+ thread.setConsumerName("consumer" + i);
ConsumerThreads.add(thread);
thread.start();
@@ -121,6 +121,7 @@
boolean finished = false;
int retry = 0;
+ int previous = 0;
while (!finished) {
int totalMessages = 0;
@@ -128,8 +129,17 @@
for (Thread thread : ConsumerThreads) {
totalMessages += ((ConsumerThread)thread).getMessageDrivenPojo().getMessageCount();
}
+ log.info(totalMessages + " received so far...");
+ if (totalMessages != 0 && previous == totalMessages) {
+ for (Thread thread : ConsumerThreads) {
+ ((ConsumerThread)thread).setRun(false);
+ }
+ Thread.sleep(3000);
+ fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
+ }
+ previous = totalMessages;
- if (totalMessages == (numberOfMessages * numberOfProducerThreads)) {
+ if (totalMessages >= (numberOfMessages * numberOfProducerThreads)) {
finished = true;
log.info("Received all " + totalMessages + " messages. Finishing.");
@@ -141,11 +151,7 @@
}
} else {
- if (retry == 10) {
- fail("Received " + totalMessages + ", expected " + (numberOfMessages * numberOfProducerThreads));
- }
Thread.sleep(10000);
- log.info(totalMessages + " received so far...");
}
}
}
Modified: activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml?rev=821106&r1=821105&r2=821106&view=diff
==============================================================================
--- activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml (original)
+++ activemq/trunk/activemq-pool/src/test/resources/activemq-spring-jdbc.xml Fri Oct 2 17:30:23 2009
@@ -23,16 +23,27 @@
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd">
- <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false">
+ <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="target/" useJmx="false" deleteAllMessagesOnStartup="true">
<persistenceAdapter>
- <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="traget/"/>
+ <jdbcPersistenceAdapter dataSource="#derby-ds" dataDirectory="target/"/>
</persistenceAdapter>
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue=">" memoryLimit="10240"/>
+ <policyEntry topic=">" memoryLimit="10240">
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+
<systemUsage>
<systemUsage>
<memoryUsage>
- <memoryUsage limit="20 mb"/>
+ <memoryUsage limit="102400"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb" name="foo"/>