You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/09/02 09:03:31 UTC
svn commit: r439552 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/cursors/
main/java/org/apache/activemq/kaha/impl/ test/...
Author: rajdavies
Date: Sat Sep 2 00:03:30 2006
New Revision: 439552
URL: http://svn.apache.org/viewvc?rev=439552&view=rev
Log:
some ground work for http://issues.apache.org/activemq/browse/AMQ-845
changed pending linked list to use a PendingMessageCursor interface instead
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (with props)
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (with props)
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java (with props)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java Sat Sep 2 00:03:30 2006
@@ -33,6 +33,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
/**
* The Message Broker which routes messages,
@@ -251,4 +252,12 @@
* Sets the default administration connection context used when configuring the broker on startup or via JMX
*/
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
+
+
+ /**
+ * @return the broker's temp data store
+ * @throws Exception
+ */
+
+ public Store getTempDataStore();
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Sat Sep 2 00:03:30 2006
@@ -35,6 +35,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
import java.util.Map;
import java.util.Set;
@@ -231,5 +232,10 @@
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
next.setAdminConnectionContext(adminConnectionContext);
}
+
+ public Store getTempDataStore() {
+ return next.getTempDataStore();
+ }
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Sat Sep 2 00:03:30 2006
@@ -17,9 +17,19 @@
*/
package org.apache.activemq.broker;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
@@ -34,15 +44,19 @@
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
+import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.*;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.kaha.Store;
+import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
@@ -64,24 +78,8 @@
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import javax.management.InstanceNotFoundException;
-import javax.management.JMException;
-import javax.management.MBeanRegistrationException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Manages the lifecycle of an ActiveMQ Broker. A BrokerService consists of a number of transport
@@ -105,6 +103,7 @@
private boolean shutdownOnMasterFailure = false;
private String brokerName = "localhost";
private File dataDirectory;
+ private File tmpDataDirectory;
private Broker broker;
private BrokerView adminView;
private ManagementContext managementContext;
@@ -139,6 +138,7 @@
private BrokerId brokerId;
private DestinationInterceptor[] destinationInterceptors;
private ActiveMQDestination[] destinations;
+ private Store tempDataStore;
/**
* Adds a new transport connector for the given bind address
@@ -530,6 +530,24 @@
public void setDataDirectory(File dataDirectory) {
this.dataDirectory = dataDirectory;
}
+
+ /**
+ * @return the tmpDataDirectory
+ */
+ public File getTmpDataDirectory(){
+ if (tmpDataDirectory == null) {
+ tmpDataDirectory = new File(getDataDirectory(), "tmp_storage");
+ }
+ return tmpDataDirectory;
+ }
+
+ /**
+ * @param tmpDataDirectory the tmpDataDirectory to set
+ */
+ public void setTmpDataDirectory(File tmpDataDirectory){
+ this.tmpDataDirectory=tmpDataDirectory;
+ }
+
public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
this.persistenceFactory = persistenceFactory;
@@ -906,6 +924,29 @@
public void setDestinations(ActiveMQDestination[] destinations) {
this.destinations = destinations;
}
+
+ /**
+ * @return the tempDataStore
+ */
+ public Store getTempDataStore() {
+ if (tempDataStore == null){
+ String name = getTmpDataDirectory().getPath();
+ try {
+ StoreFactory.delete(name);
+ tempDataStore = StoreFactory.open(name,"rw");
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+ return tempDataStore;
+ }
+
+ /**
+ * @param tempDataStore the tempDataStore to set
+ */
+ public void setTempDataStore(Store tempDataStore){
+ this.tempDataStore=tempDataStore;
+ }
// Implementation methods
// -------------------------------------------------------------------------
@@ -1386,5 +1427,6 @@
masterConnector = (MasterConnector) service;
}
}
-
+
+
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java Sat Sep 2 00:03:30 2006
@@ -35,6 +35,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
import java.util.Collections;
import java.util.Map;
@@ -227,6 +228,10 @@
public Response messagePull(ConnectionContext context, MessagePull pull) {
+ return null;
+ }
+
+ public Store getTempDataStore() {
return null;
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java Sat Sep 2 00:03:30 2006
@@ -39,6 +39,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
/**
* Implementation of the broker where all it's methods throw an
@@ -227,6 +228,10 @@
}
public Response messagePull(ConnectionContext context, MessagePull pull) {
+ throw new BrokerStoppedException(this.message);
+ }
+
+ public Store getTempDataStore() {
throw new BrokerStoppedException(this.message);
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java Sat Sep 2 00:03:30 2006
@@ -35,6 +35,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
import java.util.Map;
import java.util.Set;
@@ -242,6 +243,10 @@
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return getNext().messagePull(context, pull);
+ }
+
+ public Store getTempDataStore() {
+ return getNext().getTempDataStore();
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Sat Sep 2 00:03:30 2006
@@ -19,17 +19,16 @@
import java.io.IOException;
import java.util.Iterator;
-
import javax.jms.InvalidSelectorException;
-
+import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.SubscriptionKey;
-
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class DurableTopicSubscription extends PrefetchSubscription {
@@ -41,7 +40,8 @@
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
- super(broker,context, info);
+ //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
+ super(broker,context,info);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
}
@@ -102,7 +102,7 @@
}
if( keepDurableSubsActive ) {
synchronized(pending) {
- pending.addFirst(node);
+ pending.addMessageFirst(node);
}
} else {
node.decrementReferenceCount();
@@ -112,10 +112,11 @@
if( !keepDurableSubsActive ) {
synchronized(pending) {
- for (Iterator iter = pending.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
+ pending.reset();
+ while(pending.hasNext()) {
+ MessageReference node = pending.next();
node.decrementReferenceCount();
- iter.remove();
+ pending.remove();
}
}
}
@@ -189,8 +190,9 @@
synchronized public void destroy() {
synchronized(pending) {
- for (Iterator iter = pending.iterator(); iter.hasNext();) {
- MessageReference node = (MessageReference) iter.next();
+ pending.reset();
+ while(pending.hasNext()) {
+ MessageReference node = pending.next();
node.decrementReferenceCount();
}
pending.clear();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Sat Sep 2 00:03:30 2006
@@ -20,12 +20,14 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
-
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
-
+import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerControl;
@@ -50,7 +52,7 @@
abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
- final protected LinkedList pending=new LinkedList();
+ final protected PendingMessageCursor pending;
final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0;
@@ -59,10 +61,16 @@
long enqueueCounter;
long dispatchCounter;
long dequeueCounter;
-
- public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
- throws InvalidSelectorException{
+
+ public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info, PendingMessageCursor cursor)
+ throws InvalidSelectorException{
super(broker,context,info);
+ pending = cursor;
+ }
+
+ public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info)
+ throws InvalidSelectorException{
+ this(broker,context,info,new VMPendingMessageCursor());
}
@@ -77,8 +85,8 @@
prefetchExtension++;
final long dispatchCounterBeforePull = dispatchCounter;
- dispatchMatched();
-
+ dispatchMatched();
+
// If there was nothing dispatched.. we may need to setup a timeout.
if( dispatchCounterBeforePull == dispatchCounter ) {
// imediate timeout used by receiveNoWait()
@@ -86,7 +94,7 @@
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched();
- }
+ }
if( pull.getTimeout() > 0 ) {
Scheduler.executeAfterDelay(new Runnable(){
public void run() {
@@ -124,17 +132,18 @@
if( pending.isEmpty() ) {
log.debug("Prefetch limit.");
}
- pending.addLast(node);
+ pending.addMessageLast(node);
}
}
}
synchronized public void processMessageDispatchNotification(MessageDispatchNotification mdn) throws Exception {
synchronized(pending){
- for(Iterator i=pending.iterator();i.hasNext();){
- MessageReference node=(MessageReference) i.next();
+ pending.reset();
+ while(pending.hasNext()){
+ MessageReference node=pending.next();
if(node.getMessageId().equals(mdn.getMessageId())){
- i.remove();
+ pending.remove();
createMessageDispatch(node,node.getMessage());
dispatched.addLast(node);
return;
@@ -329,9 +338,10 @@
if(!dispatching){
dispatching=true;
try{
- for(Iterator iter=pending.iterator();iter.hasNext()&&!isFull();){
- MessageReference node=(MessageReference) iter.next();
- iter.remove();
+ pending.reset();
+ while(pending.hasNext()&&!isFull()){
+ MessageReference node=pending.next();
+ pending.remove();
dispatch(node);
}
}finally{
@@ -352,8 +362,8 @@
// NULL messages don't count... they don't get Acked.
if( node != QueueMessageReference.NULL_MESSAGE ) {
- dispatchCounter++;
- dispatched.addLast(node);
+ dispatchCounter++;
+ dispatched.addLast(node);
} else {
prefetchExtension=Math.max(0,prefetchExtension-1);
}
@@ -380,8 +390,8 @@
synchronized protected void onDispatch(final MessageReference node,final Message message){
if(node.getRegionDestination()!=null){
if( node != QueueMessageReference.NULL_MESSAGE ) {
- node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
- context.getConnection().getStatistics().onMessageDequeue(message);
+ node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
+ context.getConnection().getStatistics().onMessageDequeue(message);
}
try{
dispatchMatched();
@@ -412,19 +422,19 @@
*/
protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
if( node == QueueMessageReference.NULL_MESSAGE ) {
- MessageDispatch md = new MessageDispatch();
+ MessageDispatch md=new MessageDispatch();
md.setMessage(null);
- md.setConsumerId( info.getConsumerId() );
+ md.setConsumerId(info.getConsumerId());
md.setDestination( null );
return md;
} else {
MessageDispatch md=new MessageDispatch();
md.setConsumerId(info.getConsumerId());
- md.setDestination(node.getRegionDestination().getActiveMQDestination());
- md.setMessage(message);
- md.setRedeliveryCounter(node.getRedeliveryCounter());
- return md;
- }
+ md.setDestination(node.getRegionDestination().getActiveMQDestination());
+ md.setMessage(message);
+ md.setRedeliveryCounter(node.getRedeliveryCounter());
+ return md;
+ }
}
/**
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java Sat Sep 2 00:03:30 2006
@@ -18,13 +18,13 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
-
import javax.jms.InvalidSelectorException;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext;
public class QueueBrowserSubscription extends QueueSubscription {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Sat Sep 2 00:03:30 2006
@@ -17,6 +17,9 @@
*/
package org.apache.activemq.broker.region;
+import java.io.IOException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
@@ -28,11 +31,6 @@
import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
-import java.io.IOException;
public class QueueSubscription extends PrefetchSubscription implements LockOwner {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Sat Sep 2 00:03:30 2006
@@ -42,6 +42,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
@@ -572,5 +573,9 @@
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
this.adminConnectionContext = adminConnectionContext;
+ }
+
+ public Store getTempDataStore() {
+ return brokerService.getTempDataStore();
}
}
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Sat Sep 2 00:03:30 2006
@@ -0,0 +1,132 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+import java.io.IOException;
+import java.util.*;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.kaha.*;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.kahadaptor.CommandMarshaller;
+/**
+ * perist pending messages pending message (messages awaiting disptach to a consumer) cursor
+ *
+ * @version $Revision$
+ */
+public class FilePendingMessageCursor implements PendingMessageCursor{
+ private ListContainer list;
+ private Iterator iter = null;
+ private Destination regionDestination;
+
+ /**
+ * @param name
+ * @param store
+ * @throws IOException
+ */
+ public FilePendingMessageCursor(String name, Store store) {
+ try{
+ list = store.getListContainer(name);
+ list.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
+ list.setMaximumCacheSize(0);
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ }
+ /**
+ * @return true if there are no pending messages
+ */
+ public boolean isEmpty(){
+ return list.isEmpty();
+ }
+
+ /**
+ * reset the cursor
+ *
+ */
+ public void reset(){
+ iter = list.listIterator();
+ }
+
+ /**
+ * add message to await dispatch
+ *
+ * @param node
+ */
+ public void addMessageLast(MessageReference node){
+ try{
+ regionDestination = node.getMessage().getRegionDestination();
+ node.decrementReferenceCount();
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ list.addLast(node);
+ }
+
+ /**
+ * add message to await dispatch
+ * @param position
+ * @param node
+ */
+ public void addMessageFirst(MessageReference node){
+ try{
+ regionDestination = node.getMessage().getRegionDestination();
+ node.decrementReferenceCount();
+ }catch(IOException e){
+ throw new RuntimeException(e);
+ }
+ list.addFirst(node);
+ }
+
+
+ /**
+ * @return true if there pending messages to dispatch
+ */
+ public boolean hasNext(){
+ return iter.hasNext();
+ }
+
+ /**
+ * @return the next pending message
+ */
+ public MessageReference next(){
+ Message message = (Message) iter.next();
+ message.setRegionDestination(regionDestination);
+ message.incrementReferenceCount();
+ return message;
+ }
+
+ /**
+ * remove the message at the cursor position
+ *
+ */
+ public void remove(){
+ iter.remove();
+ }
+
+ /**
+ * @return the number of pending messages
+ */
+ public int size(){
+ return list.size();
+ }
+
+ /**
+ * clear all pending messages
+ *
+ */
+ public void clear(){
+ list.clear();
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Sat Sep 2 00:03:30 2006
@@ -0,0 +1,73 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.region.MessageReference;
+
+/**
+ * Interface to pending message (messages awaiting disptach to a consumer) cursor
+ *
+ * @version $Revision$
+ */
+public interface PendingMessageCursor{
+ /**
+ * @return true if there are no pending messages
+ */
+ public boolean isEmpty();
+
+ /**
+ * reset the cursor
+ *
+ */
+ public void reset();
+
+ /**
+ * add message to await dispatch
+ * @param node
+ */
+ public void addMessageLast(MessageReference node);
+
+ /**
+ * add message to await dispatch
+ * @param node
+ */
+ public void addMessageFirst(MessageReference node);
+
+ /**
+ * @return true if there pending messages to dispatch
+ */
+ public boolean hasNext();
+
+ /**
+ * @return the next pending message
+ */
+ public MessageReference next();
+
+ /**
+ * remove the message at the cursor position
+ *
+ */
+ public void remove();
+
+ /**
+ * @return the number of pending messages
+ */
+ public int size();
+
+ /**
+ * clear all pending messages
+ *
+ */
+ public void clear();
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java Sat Sep 2 00:03:30 2006
@@ -0,0 +1,96 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+import java.util.Iterator;
+import java.util.LinkedList;
+import org.apache.activemq.broker.region.MessageReference;
+/**
+ * hold pending messages in a linked list (messages awaiting disptach to a consumer) cursor
+ *
+ * @version $Revision$
+ */
+public class VMPendingMessageCursor implements PendingMessageCursor{
+ private LinkedList list = new LinkedList();
+ private Iterator iter = null;
+ /**
+ * @return true if there are no pending messages
+ */
+ public boolean isEmpty(){
+ return list.isEmpty();
+ }
+
+ /**
+ * reset the cursor
+ *
+ */
+ public void reset(){
+ iter = list.listIterator();
+ }
+
+ /**
+ * add message to await dispatch
+ *
+ * @param node
+ */
+ public void addMessageLast(MessageReference node){
+ list.addLast(node);
+ }
+
+ /**
+ * add message to await dispatch
+ * @param position
+ * @param node
+ */
+ public void addMessageFirst(MessageReference node){
+ list.addFirst(node);
+ }
+
+
+ /**
+ * @return true if there pending messages to dispatch
+ */
+ public boolean hasNext(){
+ return iter.hasNext();
+ }
+
+ /**
+ * @return the next pending message
+ */
+ public MessageReference next(){
+ return (MessageReference) iter.next();
+ }
+
+ /**
+ * remove the message at the cursor position
+ *
+ */
+ public void remove(){
+ iter.remove();
+ }
+
+ /**
+ * @return the number of pending messages
+ */
+ public int size(){
+ return list.size();
+ }
+
+ /**
+ * clear all pending messages
+ *
+ */
+ public void clear(){
+ list.clear();
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
------------------------------------------------------------------------------
svn:executable = *
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Sat Sep 2 00:03:30 2006
@@ -59,7 +59,8 @@
public KahaStore(String name,String mode) throws IOException{
this.name=name;
this.mode=mode;
- initialize();
+ directory=new File(name);
+ directory.mkdirs();
}
public synchronized void close() throws IOException{
@@ -113,22 +114,22 @@
}
public synchronized boolean delete() throws IOException{
- initialize();
- clear();
boolean result=true;
- for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
- IndexManager im=(IndexManager) iter.next();
- result&=im.delete();
- iter.remove();
- }
- for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
- DataManager dm=(DataManager) iter.next();
- result&=dm.delete();
- iter.remove();
- }
- // now delete all the files - containers that don't use the standard DataManager
- // and IndexManager will not have initialized the files - so these will be left around
- // unless we do this
+ if (initialized){
+ clear();
+
+ for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
+ IndexManager im=(IndexManager) iter.next();
+ result&=im.delete();
+ iter.remove();
+ }
+ for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
+ DataManager dm=(DataManager) iter.next();
+ result&=dm.delete();
+ iter.remove();
+ }
+ }
+
if(directory!=null&&directory.isDirectory()){
File[] files=directory.listFiles();
if(files!=null){
@@ -248,10 +249,8 @@
throw new IOException("Store has been closed.");
if(!initialized){
initialized=true;
- directory=new File(name);
- directory.mkdirs();
- log.info("Kaha Store using data directory " + directory);
+ log.info("Kaha Store using data directory " + directory);
DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java?rev=439552&r1=439551&r2=439552&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/kaha/impl/CachedListContainerImplTest.java Sat Sep 2 00:03:30 2006
@@ -31,7 +31,9 @@
protected int MAX_CACHE_SIZE=10;
protected KahaStore getStore() throws IOException{
- return new KahaStore(name,"rw");
+ KahaStore store = new KahaStore(name,"rw");
+ store.initialize();
+ return store;
}
public void testAdds() throws Exception{
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java?rev=439552&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java (added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java Sat Sep 2 00:03:30 2006
@@ -0,0 +1,133 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.activemq.perf;
+
+import java.io.File;
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import junit.framework.AssertionFailedError;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
+/**
+ * @version $Revision$
+ */
+public class InactiveDurableTopicTest extends TestCase{
+ private static final String DEFAULT_PASSWORD="";
+ private static final String USERNAME="testuser";
+ private static final String CLIENTID="mytestclient";
+ private static final String TOPIC_NAME="testevent";
+ private static final String SUBID="subscription1";
+ private static final int deliveryMode=javax.jms.DeliveryMode.PERSISTENT;
+ private static final int deliveryPriority=javax.jms.Message.DEFAULT_PRIORITY;
+ private Connection connection=null;
+ private MessageProducer publisher=null;
+ private TopicSubscriber subscriber=null;
+ private Topic topic=null;
+ private Session session=null;
+ ActiveMQConnectionFactory connectionFactory=null;
+ BrokerService broker;
+
+ protected void setUp() throws Exception{
+ super.setUp();
+ broker=new BrokerService();
+
+ broker.setPersistenceAdapter(new KahaPersistenceAdapter(new File ("TEST_STUFD")));
+ broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
+ broker.start();
+ connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
+ /*
+ * Doesn't matter if you enable or disable these, so just leaving them out for this test case
+ * connectionFactory.setAlwaysSessionAsync(true); connectionFactory.setAsyncDispatch(true);
+ */
+ connectionFactory.setUseAsyncSend(true);
+ }
+
+ protected void tearDown() throws Exception{
+ super.tearDown();
+ broker.stop();
+ }
+
+ public void test1CreateSubscription() throws Exception{
+ try{
+ /*
+ * Step 1 - Establish a connection with a client id and create a durable subscription
+ */
+ connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
+ assertNotNull(connection);
+ connection.setClientID(CLIENTID);
+ session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
+ assertNotNull(session);
+ topic=session.createTopic(TOPIC_NAME);
+ assertNotNull(topic);
+ subscriber=session.createDurableSubscriber(topic,SUBID,"",false);
+ assertNotNull(subscriber);
+ subscriber.close();
+ session.close();
+ connection.close();
+ }catch(JMSException ex){
+ try{
+ connection.close();
+ }catch(Exception ignore){}
+ throw new AssertionFailedError("Create Subscription caught: "+ex);
+ }
+ }
+
+ public void test2ProducerTestCase(){
+ /*
+ * Step 2 - Establish a connection without a client id and create a producer and start pumping messages. We will
+ * get hung
+ */
+ try{
+ connection=connectionFactory.createConnection(USERNAME,DEFAULT_PASSWORD);
+ assertNotNull(connection);
+ session=connection.createSession(false,javax.jms.Session.CLIENT_ACKNOWLEDGE);
+ assertNotNull(session);
+ topic=session.createTopic(TOPIC_NAME);
+ assertNotNull(topic);
+ publisher=session.createProducer(topic);
+ assertNotNull(publisher);
+ MapMessage msg=session.createMapMessage();
+ assertNotNull(msg);
+ msg.setString("key1","value1");
+ int loop;
+ for(loop=0;loop<100000;loop++){
+ msg.setInt("key2",loop);
+ publisher.send(msg,deliveryMode,deliveryPriority,Message.DEFAULT_TIME_TO_LIVE);
+ if (loop%500==0){
+ System.out.println("Sent " + loop + " messages");
+ }
+ }
+ this.assertEquals(loop,100000);
+ publisher.close();
+ session.close();
+ connection.stop();
+ connection.stop();
+ }catch(JMSException ex){
+ try{
+ connection.close();
+ }catch(Exception ignore){}
+ throw new AssertionFailedError("Create Subscription caught: "+ex);
+ }
+ }
+}
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/InactiveDurableTopicTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain