You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/12/28 22:03:54 UTC
svn commit: r490814 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/
Author: rajdavies
Date: Thu Dec 28 13:03:53 2006
New Revision: 490814
URL: http://svn.apache.org/viewvc?view=rev&rev=490814
Log:
Use the store based cursor by default for Queues - which will enable very large queue support
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Dec 28 13:03:53 2006
@@ -38,6 +38,7 @@
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import sun.security.x509.IssuerAlternativeNameExtension;
import java.util.concurrent.ConcurrentHashMap;
@@ -60,6 +61,7 @@
protected final TaskRunnerFactory taskRunnerFactory;
protected final Object destinationsMutex = new Object();
protected final Map consumerChangeMutexMap = new HashMap();
+ protected boolean started = false;
public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
if (broker == null) {
@@ -76,9 +78,15 @@
}
public void start() throws Exception {
+ started = true;
+ for (Iterator i = destinations.values().iterator();i.hasNext();) {
+ Destination dest = (Destination)i.next();
+ dest.start();
+ }
}
public void stop() throws Exception {
+ started = false;
for (Iterator i = destinations.values().iterator();i.hasNext();) {
Destination dest = (Destination)i.next();
dest.stop();
@@ -102,7 +110,7 @@
if (destinationInterceptor != null) {
dest = destinationInterceptor.intercept(dest);
}
-
+
dest.start();
destinations.put(destination, dest);
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Thu Dec 28 13:03:53 2006
@@ -77,7 +77,7 @@
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
- return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
+ return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
@@ -90,7 +90,7 @@
};
} else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
- Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
+ Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore());
configureQueue(queue, destination);
queue.initialize();
return queue;
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Dec 28 13:03:53 2006
@@ -19,22 +19,21 @@
import java.io.IOException;
import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.SubscriptionKey;
-import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
public class DurableTopicSubscription extends PrefetchSubscription {
-
+ static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
private final ConcurrentHashMap destinations = new ConcurrentHashMap();
private final SubscriptionKey subscriptionKey;
@@ -72,6 +71,7 @@
}
public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
+ log.debug("Deactivating " + this);
if( !active ) {
this.active = true;
this.context = context;
@@ -96,7 +96,8 @@
}
}
- synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
+ synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
+
active=false;
synchronized(pending){
pending.stop();
@@ -197,9 +198,12 @@
"DurableTopicSubscription:" +
" consumer="+info.getConsumerId()+
", destinations="+destinations.size()+
- ", dispatched="+dispatched.size()+
- ", delivered="+this.prefetchExtension+
- ", pending="+getPendingQueueSize();
+ ", total="+enqueueCounter+
+ ", pending="+getPendingQueueSize()+
+ ", dispatched="+dispatchCounter+
+ ", inflight="+dispatched.size()+
+ ", prefetchExtension="+this.prefetchExtension;
+
}
public String getClientId() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Thu Dec 28 13:03:53 2006
@@ -327,6 +327,10 @@
return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
}
+ public int countBeforeFull() {
+ return info.getPrefetchSize() + prefetchExtension - dispatched.size();
+ }
+
public int getPendingQueueSize(){
synchronized(pending) {
return pending.size();
@@ -396,28 +400,38 @@
List toDispatch=null;
synchronized(pending){
try{
- pending.reset();
- while(pending.hasNext()&&!isFull()){
- MessageReference node=pending.next();
- pending.remove();
- // Message may have been sitting in the pending list a while
- // waiting for the consumer to ak the message.
- if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
- continue; // just drop it.
+ int numberToDispatch=countBeforeFull();
+ if(numberToDispatch>0){
+ int count=0;
+ pending.reset();
+ while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
+ MessageReference node=pending.next();
+
+ if(canDispatch(node)){
+ pending.remove();
+ // Message may have been sitting in the pending list a while
+ // waiting for the consumer to ak the message.
+ if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
+ continue; // just drop it.
+ }
+ if(toDispatch==null){
+ toDispatch=new ArrayList();
+ }
+ toDispatch.add(node);
+ count++;
+ }
}
- if(toDispatch==null){
- toDispatch=new ArrayList();
- }
- toDispatch.add(node);
}
}finally{
pending.release();
}
}
if(toDispatch!=null){
- for(int i=0;i<toDispatch.size();i++){
- MessageReference node=(MessageReference)toDispatch.get(i);
- dispatch(node);
+ synchronized(dispatched){
+ for(int i=0;i<toDispatch.size();i++){
+ MessageReference node=(MessageReference)toDispatch.get(i);
+ dispatch(node);
+ }
}
}
}finally{
@@ -458,6 +472,7 @@
}
return true;
}else{
+ QueueMessageReference n = (QueueMessageReference) node;
return false;
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Dec 28 13:03:53 2006
@@ -28,6 +28,7 @@
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
@@ -44,6 +45,7 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
@@ -74,7 +76,7 @@
private final Valve dispatchValve = new Valve(true);
private final UsageManager usageManager;
private final DestinationStatistics destinationStatistics = new DestinationStatistics();
- private PendingMessageCursor messages = new VMPendingMessageCursor();
+ private PendingMessageCursor messages;
private final LinkedList pagedInMessages = new LinkedList();
private LockOwner exclusiveOwner;
@@ -92,13 +94,20 @@
private final Object exclusiveLockMutex = new Object();
private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
+ private boolean started = false;
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
- TaskRunnerFactory taskFactory) throws Exception {
+ TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.destination = destination;
this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE);
this.store = store;
+ if(destination.isTemporary()){
+ this.messages=new VMPendingMessageCursor();
+ }else{
+ this.messages=new StoreQueueCursor(this,tmpStore);
+ }
+
this.taskRunner = taskFactory.createTaskRunner(this, "Queue "+destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can
@@ -118,18 +127,16 @@
if(store!=null){
// Restore the persistent messages.
messages.setUsageManager(getUsageManager());
- messages.start();
if(messages.isRecoveryRequired()){
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message){
- // Message could have expired while it was being loaded..
- if( message.isExpired() ) {
- // TODO: remove message from store.
- return;
- }
-
- message.setRegionDestination(Queue.this);
+ // Message could have expired while it was being loaded..
+ if(message.isExpired()){
+ // TODO remove from store
+ return;
+ }
+ message.setRegionDestination(Queue.this);
synchronized(messages){
try{
messages.addMessageLast(message);
@@ -157,10 +164,12 @@
/**
* Lock a node
+ *
* @param node
* @param lockOwner
* @return true if can be locked
- * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.broker.region.LockOwner)
+ * @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
+ * org.apache.activemq.broker.region.LockOwner)
*/
public boolean lock(MessageReference node,LockOwner lockOwner){
synchronized(exclusiveLockMutex){
@@ -309,46 +318,60 @@
}
public void send(final ConnectionContext context,final Message message) throws Exception{
- // There is delay between the client sending it and it arriving at the
- // destination.. it may have expired.
- if( message.isExpired() ) {
- return;
- }
-
+ // There is delay between the client sending it and it arriving at the
+ // destination.. it may have expired.
+ if(message.isExpired()){
+ if (log.isDebugEnabled()) {
+ log.debug("Expired message: " + message);
+ }
+ return;
+ }
if(context.isProducerFlowControl()){
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}else{
usageManager.waitForSpace();
-
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
- if( message.isExpired() ) {
- return;
- }
+ if(message.isExpired()){
+ if (log.isDebugEnabled()) {
+ log.debug("Expired message: " + message);
+ }
+ return;
+ }
}
}
message.setRegionDestination(this);
- if (store != null && message.isPersistent()) {
- store.addMessage(context, message);
+ if(store!=null&&message.isPersistent()){
+ store.addMessage(context,message);
}
if(context.isInTransaction()){
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
-
- // It could take while before we receive the commit
- // operration.. by that time the message could have expired..
- if( message.isExpired() ) {
- // TODO: remove message from store.
- return;
- }
-
+ //even though the message could be expired - it won't be from the store
+ //and it's important to keep the store/cursor in step
+ synchronized(messages){
+ messages.addMessageLast(message);
+ }
+ // It could take while before we receive the commit
+ // operration.. by that time the message could have expired..
+ if(message.isExpired()){
+ // TODO: remove message from store.
+ if (log.isDebugEnabled()) {
+ log.debug("Expired message: " + message);
+ }
+ return;
+ }
sendMessage(context,message);
}
});
}else{
+ synchronized(messages){
+ messages.addMessageLast(message);
+ }
sendMessage(context,message);
+
}
}
@@ -432,12 +455,19 @@
}
public void start() throws Exception {
+ started = true;
+ messages.start();
+ doPageIn(false);
}
public void stop() throws Exception {
+ started = false;
if( taskRunner!=null ) {
taskRunner.shutdown();
}
+ if(messages!=null){
+ messages.stop();
+ }
}
// Properties
@@ -528,6 +558,11 @@
public Message[] browse() {
ArrayList l = new ArrayList();
+ try{
+ doPageIn(true);
+ }catch(Exception e){
+ log.error("caught an exception browsing " + this,e);
+ }
synchronized(pagedInMessages) {
for (Iterator i = pagedInMessages.iterator();i.hasNext();) {
MessageReference r = (MessageReference)i.next();
@@ -538,7 +573,7 @@
l.add(m);
}
}catch(IOException e){
- log.error("caught an exception brwsing " + this,e);
+ log.error("caught an exception browsing " + this,e);
}
finally {
r.decrementReferenceCount();
@@ -850,11 +885,10 @@
return answer;
}
+
private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
- synchronized(messages){
- messages.addMessageLast(msg);
- }
+
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
pageInMessages(false);
@@ -863,10 +897,11 @@
private List doPageIn() throws Exception{
return doPageIn(true);
}
+
private List doPageIn(boolean force) throws Exception{
final int toPageIn=maximumPagedInMessages-pagedInMessages.size();
List result=null;
- if((force || !consumers.isEmpty())&&toPageIn>0){
+ if((force||!consumers.isEmpty())&&toPageIn>0){
try{
dispatchValve.increment();
int count=0;
@@ -877,9 +912,15 @@
while(messages.hasNext()&&count<toPageIn){
MessageReference node=messages.next();
messages.remove();
- node=createMessageReference(node.getMessage());
- result.add(node);
- count++;
+ if(!node.isExpired()){
+ node=createMessageReference(node.getMessage());
+ result.add(node);
+ count++;
+ }else{
+ if (log.isDebugEnabled()) {
+ log.debug("Expired message: " + node);
+ }
+ }
}
}finally{
messages.release();
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Thu Dec 28 13:03:53 2006
@@ -73,7 +73,7 @@
protected boolean canDispatch(MessageReference n) throws IOException {
QueueMessageReference node = (QueueMessageReference) n;
- if( node.isAcked() )
+ if( node.isAcked())
return false;
// Keep message groups together.
String groupId = node.getGroupID();
@@ -208,7 +208,7 @@
/**
*/
- synchronized public void destroy() {
+ public void destroy() {
}
}
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=490814&r1=490813&r2=490814
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Dec 28 13:03:53 2006
@@ -78,7 +78,7 @@
private final Region tempQueueRegion;
private final Region tempTopicRegion;
private BrokerService brokerService;
- private boolean stopped = false;
+ private boolean started = false;
private boolean keepDurableSubsActive=false;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@@ -178,6 +178,7 @@
public void start() throws Exception {
((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
+ started = true;
queueRegion.start();
topicRegion.start();
tempQueueRegion.start();
@@ -185,7 +186,7 @@
}
public void stop() throws Exception {
- stopped = true;
+ started = false;
ServiceStopper ss = new ServiceStopper();
doStop(ss);
ss.throwFirstException();
@@ -245,7 +246,6 @@
if( destinations.contains(destination) ){
throw new JMSException("Destination already exists: "+destination);
}
-
Destination answer = null;
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
@@ -366,7 +366,8 @@
}
public void send(ConnectionContext context, Message message) throws Exception {
- message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId());
+ long si = sequenceGenerator.getNextSequenceId();
+ message.getMessageId().setBrokerSequenceId(si);
if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
//timestamp not been disabled and has not passed through a network
message.setTimestamp(System.currentTimeMillis());
@@ -541,7 +542,7 @@
}
public boolean isStopped(){
- return stopped;
+ return !started;
}
public Set getDurableDestinations(){