You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/10/09 15:05:22 UTC
svn commit: r454368 [3/3] - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/kaha/ main...
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java Mon Oct 9 06:05:20 2006
@@ -156,14 +156,34 @@
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
- public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
- MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
+ public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
// the message table is a synchronizedMap - so just have to synchronize here
+ boolean matchFound = false;
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
- if(entry.getKey().equals(lastAck)){
- return (Message) entry.getValue();
+ if(!matchFound && entry.getKey().equals(id)){
+ matchFound = true;
+ }else if (matchFound) {
+ Message msg = (Message) entry.getValue();
+ return msg.getMessageId();
+ }
+ }
+ }
+ return null;
+ }
+
+ public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId id) throws IOException{
+ // the message table is a synchronizedMap - so just have to synchronize here
+ Message last= null;
+ synchronized(messageTable){
+ for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
+ Map.Entry entry=(Entry) iter.next();
+
+ if(entry.getKey().equals(id)){
+ return last != null ? last.getMessageId() : null;
+ }else {
+ last = (Message)entry.getValue();
}
}
}
@@ -184,6 +204,9 @@
}
}
return result;
+ }
+
+ public void resetBatching(String clientId,String subscriptionName,MessageId id) {
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/rapid/RapidTopicMessageStore.java Mon Oct 9 06:05:20 2006
@@ -1,20 +1,17 @@
/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+
package org.apache.activemq.store.rapid;
import java.io.IOException;
@@ -23,7 +20,6 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-
import org.apache.activeio.journal.RecordLocation;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
@@ -42,7 +38,6 @@
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
@@ -51,76 +46,73 @@
*
* @version $Revision: 1.13 $
*/
-public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore {
-
- private static final Log log = LogFactory.getLog(RapidTopicMessageStore.class);
+public class RapidTopicMessageStore extends RapidMessageStore implements TopicMessageStore{
- private HashMap ackedLastAckLocations = new HashMap();
+ private static final Log log=LogFactory.getLog(RapidTopicMessageStore.class);
+ private HashMap ackedLastAckLocations=new HashMap();
private final MapContainer subscriberContainer;
private final MapContainer ackContainer;
private final Store store;
private Map subscriberAcks=new ConcurrentHashMap();
- public RapidTopicMessageStore(RapidPersistenceAdapter adapter, ActiveMQTopic destination, MapContainer messageContainer, MapContainer subsContainer, MapContainer ackContainer) throws IOException {
- super(adapter, destination, messageContainer);
- this.subscriberContainer = subsContainer;
- this.ackContainer = ackContainer;
+ public RapidTopicMessageStore(RapidPersistenceAdapter adapter,ActiveMQTopic destination,
+ MapContainer messageContainer,MapContainer subsContainer,MapContainer ackContainer) throws IOException{
+ super(adapter,destination,messageContainer);
+ this.subscriberContainer=subsContainer;
+ this.ackContainer=ackContainer;
this.store=adapter.getStore();
-
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
addSubscriberAckContainer(key);
}
}
- public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
-
+ public void recoverSubscription(String clientId,String subscriptionName,final MessageRecoveryListener listener)
+ throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
- ListContainer list=(ListContainer) subscriberAcks.get(key);
+ ListContainer list=(ListContainer)subscriberAcks.get(key);
if(list!=null){
for(Iterator i=list.iterator();i.hasNext();){
Object msg=messageContainer.get(i.next());
if(msg!=null){
if(msg.getClass()==String.class){
- listener.recoverMessageReference((String) msg);
+ listener.recoverMessageReference((String)msg);
}else{
- listener.recoverMessage((Message) msg);
+ listener.recoverMessage((Message)msg);
}
}
listener.finished();
}
- } else {
+ }else{
listener.finished();
}
-
}
-
+
public void recoverNextMessages(String clientId,String subscriptionName,MessageId lastMessageId,int maxReturned,
- MessageRecoveryListener listener) throws Exception{
+ MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
- ListContainer list=(ListContainer) subscriberAcks.get(key);
+ ListContainer list=(ListContainer)subscriberAcks.get(key);
if(list!=null){
boolean startFound=false;
- int count = 0;
- for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
+ int count=0;
+ for(Iterator i=list.iterator();i.hasNext()&&count<maxReturned;){
Object msg=messageContainer.get(i.next());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
- if (startFound || lastMessageId == null){
+ if(startFound||lastMessageId==null){
listener.recoverMessageReference(ref);
count++;
- }
- else if(startFound||ref.equals(lastMessageId.toString())){
+ }else if(!startFound||ref.equals(lastMessageId.toString())){
startFound=true;
}
}else{
- Message message=(Message) msg;
- if(startFound||message.getMessageId().equals(lastMessageId)){
- startFound=true;
- }else{
+ Message message=(Message)msg;
+ if(startFound||lastMessageId==null){
listener.recoverMessage(message);
count++;
+ }else if(!startFound&&message.getMessageId().equals(lastMessageId)){
+ startFound=true;
}
}
}
@@ -131,11 +123,12 @@
}
}
- public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
- return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
+ public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
+ return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
- public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
+ public void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
+ throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
@@ -163,148 +156,139 @@
super.addMessage(context,message);
}
}
-
-
+
/**
*/
- public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
- final boolean debug = log.isDebugEnabled();
-
- JournalTopicAck ack = new JournalTopicAck();
+ public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
+ throws IOException{
+ final boolean debug=log.isDebugEnabled();
+ JournalTopicAck ack=new JournalTopicAck();
ack.setDestination(destination);
ack.setMessageId(messageId);
ack.setMessageSequenceId(messageId.getBrokerSequenceId());
ack.setSubscritionName(subscriptionName);
ack.setClientId(clientId);
- ack.setTransactionId( context.getTransaction()!=null ? context.getTransaction().getTransactionId():null);
- final RecordLocation location = peristenceAdapter.writeCommand(ack, false);
-
- final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
- if( !context.isInTransaction() ) {
- if( debug )
+ ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
+ final RecordLocation location=peristenceAdapter.writeCommand(ack,false);
+ final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
+ if(!context.isInTransaction()){
+ if(debug)
log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
- acknowledge(messageId, location, key);
- } else {
- if( debug )
+ acknowledge(messageId,location,key);
+ }else{
+ if(debug)
log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
- synchronized (this) {
+ synchronized(this){
inFlightTxLocations.add(location);
}
- transactionStore.acknowledge(this, ack, location);
+ transactionStore.acknowledge(this,ack,location);
context.getTransaction().addSynchronization(new Synchronization(){
- public void afterCommit() throws Exception {
- if( debug )
+
+ public void afterCommit() throws Exception{
+ if(debug)
log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
- synchronized (RapidTopicMessageStore.this) {
+ synchronized(RapidTopicMessageStore.this){
inFlightTxLocations.remove(location);
- acknowledge(messageId, location, key);
+ acknowledge(messageId,location,key);
}
}
- public void afterRollback() throws Exception {
- if( debug )
+
+ public void afterRollback() throws Exception{
+ if(debug)
log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
- synchronized (RapidTopicMessageStore.this) {
+ synchronized(RapidTopicMessageStore.this){
inFlightTxLocations.remove(location);
}
}
});
}
-
}
-
- public void replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
- try {
- synchronized(this) {
+
+ public void replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,MessageId messageId){
+ try{
+ synchronized(this){
String subcriberId=getSubscriptionKey(clientId,subscritionName);
String id=messageId.toString();
- ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+ ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
if(container!=null){
- //container.remove(id);
+ // container.remove(id);
container.removeFirst();
- AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+ AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
- } else {
+ }else{
// no more references to message messageContainer so remove it
messageContainer.remove(messageId.toString());
}
}
}
}
- }
- catch (Throwable e) {
- log.debug("Could not replay acknowledge for message '" + messageId + "'. Message may have already been acknowledged. reason: " + e);
+ }catch(Throwable e){
+ log.debug("Could not replay acknowledge for message '"+messageId
+ +"'. Message may have already been acknowledged. reason: "+e);
}
}
-
/**
* @param messageId
* @param location
* @param key
*/
- private void acknowledge(MessageId messageId, RecordLocation location, SubscriptionKey key) {
- synchronized(this) {
- lastLocation = location;
- ackedLastAckLocations.put(key, messageId);
-
+ private void acknowledge(MessageId messageId,RecordLocation location,SubscriptionKey key){
+ synchronized(this){
+ lastLocation=location;
+ ackedLastAckLocations.put(key,messageId);
String subcriberId=getSubscriptionKey(key.getClientId(),key.getSubscriptionName());
String id=messageId.toString();
- ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
+ ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
if(container!=null){
- //container.remove(id);
+ // container.remove(id);
container.removeFirst();
- AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+ AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
- } else {
+ }else{
// no more references to message messageContainer so remove it
messageContainer.remove(messageId.toString());
}
}
}
- }
+ }
}
-
+
protected String getSubscriptionKey(String clientId,String subscriberName){
String result=clientId+":";
result+=subscriberName!=null?subscriberName:"NOT_SET";
return result;
}
-
- public RecordLocation checkpoint() throws IOException {
-
- ArrayList cpAckedLastAckLocations;
-
+ public RecordLocation checkpoint() throws IOException{
+ ArrayList cpAckedLastAckLocations;
// swap out the hash maps..
- synchronized (this) {
- cpAckedLastAckLocations = new ArrayList(this.ackedLastAckLocations.values());
- this.ackedLastAckLocations = new HashMap();
+ synchronized(this){
+ cpAckedLastAckLocations=new ArrayList(this.ackedLastAckLocations.values());
+ this.ackedLastAckLocations=new HashMap();
}
-
- RecordLocation rc = super.checkpoint();
- if(!cpAckedLastAckLocations.isEmpty()) {
+ RecordLocation rc=super.checkpoint();
+ if(!cpAckedLastAckLocations.isEmpty()){
Collections.sort(cpAckedLastAckLocations);
- RecordLocation t = (RecordLocation) cpAckedLastAckLocations.get(0);
- if( rc == null || t.compareTo(rc)<0 ) {
- rc = t;
+ RecordLocation t=(RecordLocation)cpAckedLastAckLocations.get(0);
+ if(rc==null||t.compareTo(rc)<0){
+ rc=t;
}
}
-
return rc;
}
-
- public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.remove(key);
- ListContainer list=(ListContainer) subscriberAcks.get(key);
+ ListContainer list=(ListContainer)subscriberAcks.get(key);
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
- AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
+ AtomicInteger count=(AtomicInteger)ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
@@ -316,30 +300,63 @@
}
}
- public SubscriptionInfo[] getAllSubscriptions() throws IOException {
- return (SubscriptionInfo[]) subscriberContainer.values().toArray(
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+ return (SubscriptionInfo[])subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
protected void addSubscriberAckContainer(Object key) throws IOException{
- ListContainer container=store.getListContainer(key,"topic-subs");
+ ListContainer container=store.getListContainer(key,"durable-subs");
Marshaller marshaller=new StringMarshaller();
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
-
- public Message getNextMessageToDeliver(String clientId,String subscriptionName) throws IOException{
+
+ public MessageId getNextMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
+ throws IOException{
+ MessageId result=null;
+ boolean getNext=false;
String key=getSubscriptionKey(clientId,subscriptionName);
- ListContainer list=(ListContainer) subscriberAcks.get(key);
- Iterator iter = list.iterator();
- return (Message) (iter.hasNext() ? iter.next() : null);
-
+ ListContainer list=(ListContainer)subscriberAcks.get(key);
+ Iterator iter=list.iterator();
+ for(Iterator i=list.iterator();i.hasNext();){
+ String id=i.next().toString();
+ if(id.equals(messageId.toString())){
+ getNext=true;
+ }else if(getNext){
+ result=new MessageId(id);
+ break;
+ }
+ }
+ return result;
}
-
+
+ public MessageId getPreviousMessageIdToDeliver(String clientId,String subscriptionName,MessageId messageId)
+ throws IOException{
+ MessageId result=null;
+ String previousId=null;
+ String key=getSubscriptionKey(clientId,subscriptionName);
+ ListContainer list=(ListContainer)subscriberAcks.get(key);
+ Iterator iter=list.iterator();
+ for(Iterator i=list.iterator();i.hasNext();){
+ String id=i.next().toString();
+ if(id.equals(messageId.toString())){
+ if(previousId!=null){
+ result=new MessageId(previousId);
+ }
+ break;
+ }
+ previousId=id;
+ }
+ return result;
+ }
+
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
- ListContainer list=(ListContainer) subscriberAcks.get(key);
+ ListContainer list=(ListContainer)subscriberAcks.get(key);
return list.size();
}
+ public void resetBatching(String clientId,String subscriptionName,MessageId nextId){
+ }
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?view=diff&rev=454368&r1=454367&r2=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Mon Oct 9 06:05:20 2006
@@ -507,7 +507,7 @@
session.commit();
// Only pick up the first message.
- Message message1 = message1 = consumer.receive(1000);
+ Message message1 = consumer.receive(1000);
assertNotNull(message1);
// Don't acknowledge yet. This should keep our prefetch full.
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java?view=auto&rev=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java Mon Oct 9 06:05:20 2006
@@ -0,0 +1,215 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class CursorDurableTest extends TestCase{
+
+ protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
+
+ protected static final int MESSAGE_COUNT=50;
+ protected static final int PREFETCH_SIZE = 5;
+ protected BrokerService broker;
+ protected String bindAddress="tcp://localhost:60706";
+ protected int topicCount=0;
+
+ public void testSendFirstThenConsume() throws Exception{
+ ConnectionFactory factory=createConnectionFactory();
+ Connection consumerConnection= getConsumerConnection(factory);
+ //create durable subs
+ MessageConsumer consumer = getConsumer(consumerConnection);
+ consumerConnection.close();
+
+ Connection producerConnection = factory.createConnection();
+ producerConnection.start();
+ Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(getTopic(session));
+ List senderList = new ArrayList();
+ for (int i =0; i < MESSAGE_COUNT; i++) {
+ Message msg=session.createTextMessage("test"+i);
+ senderList.add(msg);
+ producer.send(msg);
+ }
+ producerConnection.close();
+
+ //now consume the messages
+ consumerConnection= getConsumerConnection(factory);
+ //create durable subs
+ consumer = getConsumer(consumerConnection);
+ List consumerList = new ArrayList();
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message msg = consumer.receive();
+ consumerList.add(msg);
+ }
+ assertEquals(senderList,consumerList);
+ consumerConnection.close();
+ }
+
+ public void testSendWhilstConsume() throws Exception{
+ ConnectionFactory factory=createConnectionFactory();
+ Connection consumerConnection= getConsumerConnection(factory);
+ //create durable subs
+ MessageConsumer consumer = getConsumer(consumerConnection);
+ consumerConnection.close();
+
+ Connection producerConnection = factory.createConnection();
+ producerConnection.start();
+ Session session = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(getTopic(session));
+ List senderList = new ArrayList();
+ for (int i =0; i < MESSAGE_COUNT/10; i++) {
+ TextMessage msg=session.createTextMessage("test"+i);
+ senderList.add(msg);
+ producer.send(msg);
+ }
+
+
+ //now consume the messages
+ consumerConnection= getConsumerConnection(factory);
+ //create durable subs
+ consumer = getConsumer(consumerConnection);
+ final List consumerList = new ArrayList();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ consumer.setMessageListener(new MessageListener() {
+
+ public void onMessage(Message msg){
+ try{
+ //sleep to act as a slow consumer
+ //which will force a mix of direct and polled dispatching
+ //using the cursor on the broker
+ Thread.sleep(50);
+ }catch(Exception e){
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ consumerList.add(msg);
+ if (consumerList.size()==MESSAGE_COUNT) {
+ latch.countDown();
+ }
+
+ }
+
+ });
+ for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
+ TextMessage msg=session.createTextMessage("test"+i);
+ senderList.add(msg);
+ producer.send(msg);
+ }
+
+
+ latch.await(300000,TimeUnit.MILLISECONDS);
+ assertEquals("Still dipatching - count down latch not sprung" , latch.getCount(),0);
+ assertEquals("cosumerList - expected: " + MESSAGE_COUNT + " but was: " + consumerList.size(),consumerList.size(),senderList.size());
+ assertEquals(senderList,consumerList);
+ producerConnection.close();
+ consumerConnection.close();
+ }
+
+
+
+ protected Topic getTopic(Session session) throws JMSException{
+ String topicName=getClass().getName();
+ return session.createTopic(topicName);
+ }
+
+ protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException{
+ Connection connection=fac.createConnection();
+ connection.setClientID("testConsumer");
+ connection.start();
+ return connection;
+
+ }
+
+ protected MessageConsumer getConsumer(Connection connection) throws Exception{
+ Session consumerSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ Topic topic = getTopic(consumerSession);
+ MessageConsumer consumer = consumerSession.createDurableSubscriber(topic,"testConsumer");
+ return consumer;
+ }
+
+
+
+ protected void setUp() throws Exception{
+ if(broker==null){
+ broker=createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception{
+ super.tearDown();
+
+ if(broker!=null){
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception{
+ ActiveMQConnectionFactory cf=new ActiveMQConnectionFactory(bindAddress);
+ Properties props = new Properties();
+ props.setProperty("prefetchPolicy.durableTopicPrefetch","" + PREFETCH_SIZE);
+ props.setProperty("prefetchPolicy.optimizeDurableTopicPrefetch","" + PREFETCH_SIZE);
+ cf.setProperties(props);
+ return cf;
+ }
+
+
+
+ protected BrokerService createBroker() throws Exception{
+ BrokerService answer=new BrokerService();
+ configureBroker(answer);
+ answer.setDeleteAllMessagesOnStartup(true);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception{
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java?view=auto&rev=454368
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java Mon Oct 9 06:05:20 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * @version $Revision: 1.3 $
+ */
+public class KahaCursorDurableTest extends CursorDurableTest{
+
+ protected static final Log log = LogFactory.getLog(KahaCursorDurableTest.class);
+
+
+
+ protected void configureBroker(BrokerService answer) throws Exception{
+ KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
+ answer.setPersistenceAdapter(adaptor);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}