You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/22 15:28:13 UTC
svn commit: r614206 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker:
jmx/ region/ region/cursors/ region/policy/
Author: rajdavies
Date: Tue Jan 22 06:28:10 2008
New Revision: 614206
URL: http://svn.apache.org/viewvc?rev=614206&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1562
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java Tue Jan 22 06:28:10 2008
@@ -93,7 +93,7 @@
return broker.getDestinationStatistics().getMessagesCached().getCount();
}
- public int getMemoryPercentageUsed() {
+ public int getMemoryPercentUsage() {
return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
}
@@ -109,16 +109,16 @@
return brokerService.getSystemUsage().getStoreUsage().getLimit();
}
- public int getStorePercentageUsed() {
+ public int getStorePercentUsage() {
return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
}
- public long getTmpLimit() {
+ public long getTempLimit() {
return brokerService.getSystemUsage().getTempUsage().getLimit();
}
- public int getTmpPercentageUsed() {
+ public int getTempPercentUsage() {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
@@ -126,7 +126,7 @@
brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
}
- public void setTmpLimit(long limit) {
+ public void setTempLimit(long limit) {
brokerService.getSystemUsage().getTempUsage().setLimit(limit);
}
@@ -172,7 +172,7 @@
}
public ObjectName[] getTopicSubscribers() {
- return broker.getTemporaryTopicSubscribers();
+ return broker.getTopicSubscribers();
}
public ObjectName[] getDurableTopicSubscribers() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java Tue Jan 22 06:28:10 2008
@@ -61,23 +61,23 @@
long getTotalMessageCount();
- int getMemoryPercentageUsed();
+ int getMemoryPercentUsage();
long getMemoryLimit();
void setMemoryLimit(long limit);
- int getStorePercentageUsed();
+ int getStorePercentUsage();
long getStoreLimit();
void setStoreLimit(long limit);
- int getTmpPercentageUsed();
+ int getTempPercentUsage();
- long getTmpLimit();
+ long getTempLimit();
- void setTmpLimit(long limit);
+ void setTempLimit(long limit);
boolean isPersistent();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java Tue Jan 22 06:28:10 2008
@@ -93,7 +93,7 @@
return destination.getDestinationStatistics().getMessagesCached().getCount();
}
- public int getMemoryPercentageUsed() {
+ public int getMemoryPercentUsage() {
return destination.getMemoryUsage().getPercentUsage();
}
@@ -294,7 +294,7 @@
}
- public float getMemoryLimitPortion() {
+ public float getMemoryUsagePortion() {
return destination.getMemoryUsage().getUsagePortion();
}
@@ -306,7 +306,7 @@
return destination.isProducerFlowControl();
}
- public void setMemoryLimitPortion(float value) {
+ public void setMemoryUsagePortion(float value) {
destination.getMemoryUsage().setUsagePortion(value);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java Tue Jan 22 06:28:10 2008
@@ -127,7 +127,7 @@
/**
* @return the percentage of amount of memory used
*/
- int getMemoryPercentageUsed();
+ int getMemoryPercentUsage();
/**
* @return the amount of memory allocated to this destination
@@ -143,13 +143,13 @@
/**
* @return the portion of memory from the broker memory limit for this destination
*/
- float getMemoryLimitPortion();
+ float getMemoryUsagePortion();
/**
* set the portion of memory from the broker memory limit for this destination
* @param value
*/
- void setMemoryLimitPortion(float value);
+ void setMemoryUsagePortion(float value);
/**
* Browses the current destination returning a list of messages
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Tue Jan 22 06:28:10 2008
@@ -139,10 +139,7 @@
return destination;
}
- public final String getDestination() {
- return destination.getPhysicalName();
- }
-
+
public final String getName() {
return getActiveMQDestination().getPhysicalName();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Tue Jan 22 06:28:10 2008
@@ -76,7 +76,7 @@
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
- return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory, broker.getTempDataStore()) {
+ return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume
@@ -90,7 +90,7 @@
};
} else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
- Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory, broker.getTempDataStore());
+ Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination);
queue.initialize();
return queue;
@@ -127,7 +127,7 @@
if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
- entry.configure(queue, broker.getTempDataStore());
+ entry.configure(broker,queue);
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Tue Jan 22 06:28:10 2008
@@ -52,7 +52,7 @@
public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
super(broker,usageManager, context, info);
- this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
+ this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
@@ -218,17 +218,10 @@
node.decrementReferenceCount();
}
- public String getSubscriptionName() {
- return subscriptionKey.getSubscriptionName();
- }
-
+
public synchronized String toString() {
return "DurableTopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", total=" + enqueueCounter + ", pending="
+ getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
- }
-
- public String getClientId() {
- return subscriptionKey.getClientId();
}
public SubscriptionKey getSubscriptionKey() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Tue Jan 22 06:28:10 2008
@@ -95,14 +95,14 @@
};
};
- public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats,
- TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
+ public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats,
+ TaskRunnerFactory taskFactory) throws Exception {
super(broker, store, destination,systemUsage, parentStats);
- if (destination.isTemporary() || tmpStore==null ) {
+ if (destination.isTemporary() || broker == null || store==null ) {
this.messages = new VMPendingMessageCursor();
} else {
- this.messages = new StoreQueueCursor(this, tmpStore);
+ this.messages = new StoreQueueCursor(broker,this);
}
this.taskRunner = taskFactory.createTaskRunner(this, "Queue " + destination.getPhysicalName());
@@ -318,11 +318,11 @@
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
-
+ message.setRegionDestination(this);
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
if (message.isExpired()) {
- broker.messageExpired(context, message);
+ broker.getRoot().messageExpired(context, message);
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
if (sendProducerAck) {
@@ -402,6 +402,7 @@
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
+ broker.getRoot().messageExpired(context, message);
return;
}
}
@@ -416,7 +417,6 @@
void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
synchronized (sendLock) {
- message.setRegionDestination(this);
if (store != null && message.isPersistent()) {
while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
if (context.getStopping().get()) {
@@ -678,11 +678,7 @@
// We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
- MessageAck ack = new MessageAck();
- ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
- ack.setDestination(destination);
- ack.setMessageID(r.getMessageId());
- removeMessage(c, null, r, ack);
+ removeMessage(c,(IndirectMessageReference) r);
}
} catch (IOException e) {
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Tue Jan 22 06:28:10 2008
@@ -705,14 +705,18 @@
deadLetterDestination);
sent=true;
}
+ }else {
+ //don't want to warn about failing to send
+ // if there isn't a dead letter strategy
+ sent=true;
}
}
}
if(sent==false){
- LOG.warn("Failed to send "+node+" to dead letter queue");
+ LOG.warn("Failed to send "+node+" to DLQ");
}
}catch(Exception e){
- LOG.warn("Failed to pass expired message to dead letter queue",e);
+ LOG.warn("Caught an exception sending to DLQ: "+node,e);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Tue Jan 22 06:28:10 2008
@@ -41,7 +41,7 @@
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
- return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory, null) {
+ return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Tue Jan 22 06:28:10 2008
@@ -184,8 +184,8 @@
}
// Recover the durable subscription.
- String clientId = subscription.getClientId();
- String subscriptionName = subscription.getSubscriptionName();
+ String clientId = subscription.getSubscriptionKey().getClientId();
+ String subscriptionName = subscription.getSubscriptionKey().getSubscriptionName();
String selector = subscription.getConsumerInfo().getSelector();
SubscriptionInfo info = topicStore.lookupSubscription(clientId, subscriptionName);
if (info != null) {
@@ -435,7 +435,8 @@
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription)sub;
- topicStore.acknowledge(context, dsub.getClientId(), dsub.getSubscriptionName(), node.getMessageId());
+ SubscriptionKey key = dsub.getSubscriptionKey();
+ topicStore.acknowledge(context, key.getClientId(), key.getSubscriptionName(), node.getMessageId());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java Tue Jan 22 06:28:10 2008
@@ -68,11 +68,10 @@
super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
- Store tempDataStore = broker.getTempDataStore();
- if (tempDataStore != null) {
- this.matched = new FilePendingMessageCursor(matchedName, tempDataStore);
- } else {
+ if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {
this.matched = new VMPendingMessageCursor();
+ } else {
+ this.matched = new FilePendingMessageCursor(broker,matchedName);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java Tue Jan 22 06:28:10 2008
@@ -21,6 +21,9 @@
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
@@ -32,6 +35,8 @@
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* persist pending messages pending message (messages awaiting dispatch to a
@@ -40,14 +45,14 @@
* @version $Revision$
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener {
-
+ private static final Log LOG = LogFactory.getLog(FilePendingMessageCursor.class);
private static final AtomicLong NAME_COUNT = new AtomicLong();
-
+ protected Broker broker;
private Store store;
private String name;
private LinkedList<MessageReference> memoryList = new LinkedList<MessageReference>();
private ListContainer<MessageReference> diskList;
- private Iterator iter;
+ private Iterator<MessageReference> iter;
private Destination regionDestination;
private boolean iterating;
private boolean flushRequired;
@@ -58,9 +63,10 @@
* @param name
* @param store
*/
- public FilePendingMessageCursor(String name, Store store) {
+ public FilePendingMessageCursor(Broker broker,String name) {
+ this.broker = broker;
+ this.store= broker.getTempDataStore();
this.name = NAME_COUNT.incrementAndGet() + "_" + name;
- this.store = store;
}
public void start() throws Exception {
@@ -157,19 +163,39 @@
* @param node
*/
public synchronized void addMessageLast(MessageReference node) {
- try {
- regionDestination = node.getMessage().getRegionDestination();
- if (isSpaceInMemoryList()) {
- memoryList.add(node);
- node.incrementReferenceCount();
- } else {
- flushToDisk();
- node.decrementReferenceCount();
+ if (!node.isExpired()) {
+ try {
+ regionDestination = node.getMessage().getRegionDestination();
+ if (isDiskListEmpty()) {
+ if (hasSpace()) {
+ memoryList.add(node);
+ node.incrementReferenceCount();
+ return;
+ }
+ }
+ if (!hasSpace()) {
+ if (isDiskListEmpty()) {
+ expireOldMessages();
+ if (hasSpace()) {
+ memoryList.add(node);
+ node.incrementReferenceCount();
+ return;
+ } else {
+ flushToDisk();
+ }
+ }
+ }
systemUsage.getTempUsage().waitForSpace();
- getDiskList().addLast(node);
+ node.decrementReferenceCount();
+ getDiskList().add(node);
+
+ } catch (Exception e) {
+ LOG.error("Caught an Exception adding a message: " + node
+ + " first to FilePendingMessageCursor ", e);
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ } else {
+ discard(node);
}
}
@@ -179,19 +205,39 @@
* @param node
*/
public synchronized void addMessageFirst(MessageReference node) {
- try {
- regionDestination = node.getMessage().getRegionDestination();
- if (isSpaceInMemoryList()) {
- memoryList.addFirst(node);
- node.incrementReferenceCount();
- } else {
- flushToDisk();
+ if (!node.isExpired()) {
+ try {
+ regionDestination = node.getMessage().getRegionDestination();
+ if (isDiskListEmpty()) {
+ if (hasSpace()) {
+ memoryList.addFirst(node);
+ node.incrementReferenceCount();
+ return;
+ }
+ }
+ if (!hasSpace()) {
+ if (isDiskListEmpty()) {
+ expireOldMessages();
+ if (hasSpace()) {
+ memoryList.addFirst(node);
+ node.incrementReferenceCount();
+ return;
+ } else {
+ flushToDisk();
+ }
+ }
+ }
systemUsage.getTempUsage().waitForSpace();
node.decrementReferenceCount();
getDiskList().addFirst(node);
+
+ } catch (Exception e) {
+ LOG.error("Caught an Exception adding a message: " + node
+ + " first to FilePendingMessageCursor ", e);
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- throw new RuntimeException(e);
+ } else {
+ discard(node);
}
}
@@ -271,13 +317,17 @@
super.setSystemUsage(usageManager);
}
- public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
+ public void onUsageChanged(Usage usage, int oldPercentUsage,
+ int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) {
flushRequired = true;
if (!iterating) {
- flushToDisk();
- flushRequired = false;
+ expireOldMessages();
+ if (!hasSpace()) {
+ flushToDisk();
+ flushRequired = false;
+ }
}
}
}
@@ -290,8 +340,25 @@
protected boolean isSpaceInMemoryList() {
return hasSpace() && isDiskListEmpty();
}
+
+ protected synchronized void expireOldMessages() {
+ if (!memoryList.isEmpty()) {
+ LinkedList<MessageReference> tmpList = new LinkedList<MessageReference>(this.memoryList);
+ this.memoryList = new LinkedList<MessageReference>();
+ while (!tmpList.isEmpty()) {
+ MessageReference node = tmpList.removeFirst();
+ if (node.isExpired()) {
+ discard(node);
+ }else {
+ memoryList.add(node);
+ }
+ }
+ }
+
+ }
protected synchronized void flushToDisk() {
+
if (!memoryList.isEmpty()) {
while (!memoryList.isEmpty()) {
MessageReference node = memoryList.removeFirst();
@@ -312,10 +379,18 @@
diskList = store.getListContainer(name, "TopicSubscription", true);
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error("Caught an IO Exception getting the DiskList ",e);
throw new RuntimeException(e);
}
}
return diskList;
+ }
+
+ protected void discard(MessageReference message) {
+ message.decrementReferenceCount();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Discarding message " + message);
+ }
+ broker.getRoot().sendToDeadLetterQueue(new ConnectionContext(), message);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Tue Jan 22 06:28:10 2008
@@ -94,7 +94,7 @@
}
public synchronized void addMessageLast(MessageReference node) throws Exception {
- if (cacheEnabled && !isFull()) {
+ if (cacheEnabled && hasSpace()) {
//optimization - A persistent queue will add the message to
//to store then retrieve it again from the store.
recoverMessage(node.getMessage());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Tue Jan 22 06:28:10 2008
@@ -23,13 +23,13 @@
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,16 +53,19 @@
private final Subscription subscription;
/**
+ * @param broker
* @param topic
* @param clientId
* @param subscriberName
+ * @param maxBatchSize
+ * @param subscription
* @throws IOException
*/
- public StoreDurableSubscriberCursor(String clientId, String subscriberName, Store store, int maxBatchSize, Subscription subscription) {
+ public StoreDurableSubscriberCursor(Broker broker,String clientId, String subscriberName,int maxBatchSize, Subscription subscription) {
this.clientId = clientId;
this.subscriberName = subscriberName;
this.subscription = subscription;
- this.nonPersistent = new FilePendingMessageCursor(clientId + subscriberName, store);
+ this.nonPersistent = new FilePendingMessageCursor(broker,clientId + subscriberName);
storePrefetches.add(nonPersistent);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Tue Jan 22 06:28:10 2008
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.ActiveMQMessageAudit;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
@@ -33,9 +34,9 @@
public class StoreQueueCursor extends AbstractPendingMessageCursor {
private static final Log LOG = LogFactory.getLog(StoreQueueCursor.class);
+ private Broker broker;
private int pendingCount;
private Queue queue;
- private Store tmpStore;
private PendingMessageCursor nonPersistent;
private QueueStorePrefetch persistent;
private boolean started;
@@ -47,9 +48,9 @@
* @param queue
* @param tmpStore
*/
- public StoreQueueCursor(Queue queue, Store tmpStore) {
+ public StoreQueueCursor(Broker broker,Queue queue) {
+ this.broker=broker;
this.queue = queue;
- this.tmpStore = tmpStore;
this.persistent = new QueueStorePrefetch(queue);
currentCursor = persistent;
}
@@ -58,7 +59,7 @@
started = true;
super.start();
if (nonPersistent == null) {
- nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
+ nonPersistent = new FilePendingMessageCursor(broker,queue.getName());
nonPersistent.setMaxBatchSize(getMaxBatchSize());
nonPersistent.setSystemUsage(systemUsage);
nonPersistent.setEnableAudit(isEnableAudit());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Creates a PendIngMessageCursor for Durable subscribers *
@@ -33,14 +33,15 @@
/**
* Retrieve the configured pending message storage cursor;
+ * @param broker
*
* @param clientId
* @param name
- * @param tmpStorage
* @param maxBatchSize
+ * @param sub
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
- return new FilePendingMessageCursor(name, tmpStorage);
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
+ return new FilePendingMessageCursor(broker,name);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Creates a FilePendingMessageCursor *
@@ -32,14 +32,14 @@
public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
+ * @param broker
* @param queue
- * @param tmpStore
* @return the cursor
* @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
* org.apache.activemq.kaha.Store)
*/
- public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
- return new FilePendingMessageCursor("PendingCursor:" + queue.getName(), tmpStore);
+ public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
+ return new FilePendingMessageCursor(broker,"PendingCursor:" + queue.getName());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Creates a PendIngMessageCursor for Durable subscribers *
@@ -31,15 +31,14 @@
public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
+ * @param broker
* @param name
- * @param tmpStorage
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
- int maxBatchSize) {
- return new FilePendingMessageCursor("PendingCursor:" + name, tmpStorage);
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
+ return new FilePendingMessageCursor(broker,"PendingCursor:" + name);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Abstraction to allow different policies for holding messages awaiting
@@ -30,12 +30,13 @@
/**
* Retrieve the configured pending message storage cursor;
+ * @param broker
*
* @param clientId
* @param name
- * @param tmpStorage
* @param maxBatchSize
+ * @param sub
* @return the Pending Message cursor
*/
- PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub);
+ PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Abstraction to allow different policies for holding messages awaiting
@@ -30,10 +30,10 @@
/**
* Retrieve the configured pending message storage cursor;
+ * @param broker
*
* @param queue
- * @param tmpStore
* @return the cursor
*/
- PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore);
+ PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,8 +16,8 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Abstraction to allow different policies for holding messages awaiting
@@ -29,11 +29,11 @@
/**
* Retrieve the configured pending message storage cursor;
+ * @param broker
*
* @param name
- * @param tmpStorage
* @param maxBatchSize
* @return the Pending Message cursor
*/
- PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage, int maxBatchSize);
+ PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Tue Jan 22 06:28:10 2008
@@ -25,7 +25,6 @@
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,7 +57,7 @@
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
- public void configure(Queue queue, Store tmpStore) {
+ public void configure(Broker broker,Queue queue) {
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
}
@@ -70,7 +69,7 @@
queue.getMemoryUsage().setLimit(memoryLimit);
}
if (pendingQueuePolicy != null) {
- PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
+ PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(broker,queue);
queue.setMessages(messages);
}
queue.setProducerFlowControl(isProducerFlowControl());
@@ -121,16 +120,16 @@
if (pendingSubscriberPolicy != null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
- subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name, broker.getTempDataStore(), maxBatchSize));
+ subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name, maxBatchSize));
}
}
public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
- String clientId = sub.getClientId();
- String subName = sub.getSubscriptionName();
+ String clientId = sub.getSubscriptionKey().getClientId();
+ String subName = sub.getSubscriptionKey().getSubscriptionName();
int prefetch = sub.getPrefetchSize();
if (pendingDurableSubscriberPolicy != null) {
- PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch, sub);
+ PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(broker,clientId, subName,prefetch,sub);
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@@ -34,14 +35,15 @@
/**
* Retrieve the configured pending message storage cursor;
+ * @param broker
*
* @param clientId
* @param name
- * @param tmpStorage
* @param maxBatchSize
+ * @param sub
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
- return new StoreDurableSubscriberCursor(clientId, name, tmpStorage, maxBatchSize, sub);
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name, int maxBatchSize, Subscription sub) {
+ return new StoreDurableSubscriberCursor(broker,clientId, name, maxBatchSize, sub);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
-import org.apache.activemq.kaha.Store;
/**
* Creates a StoreQueueCursor *
@@ -32,14 +32,14 @@
public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
+ * @param broker
* @param queue
- * @param tmpStore
* @return the cursor
* @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue,
* org.apache.activemq.kaha.Store)
*/
- public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
- return new StoreQueueCursor(queue, tmpStore);
+ public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
+ return new StoreQueueCursor(broker,queue);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingDurableSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor *
@@ -32,14 +32,14 @@
/**
* Retrieve the configured pending message storage cursor;
- *
+ * @param broker
* @param clientId
* @param name
- * @param tmpStorage
* @param maxBatchSize
+ * @param sub
* @return the Pending Message cursor
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String clientId, String name, Store tmpStorage, int maxBatchSize, Subscription sub) {
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String clientId, String name,int maxBatchSize, Subscription sub) {
return new VMPendingMessageCursor();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor *
@@ -32,11 +32,11 @@
public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy {
/**
+ * @param broker
* @param queue
- * @param tmpStore
* @return the cursor
*/
- public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore) {
+ public PendingMessageCursor getQueuePendingMessageCursor(Broker broker,Queue queue) {
return new VMPendingMessageCursor();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java?rev=614206&r1=614205&r2=614206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingSubscriberMessageStoragePolicy.java Tue Jan 22 06:28:10 2008
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.broker.region.policy;
+import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
-import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor *
@@ -31,15 +31,14 @@
public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy {
/**
+ * @param broker
* @param name
- * @param tmpStorage
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String,
* org.apache.activemq.kaha.Store, int)
*/
- public PendingMessageCursor getSubscriberPendingMessageCursor(String name, Store tmpStorage,
- int maxBatchSize) {
+ public PendingMessageCursor getSubscriberPendingMessageCursor(Broker broker,String name,int maxBatchSize) {
return new VMPendingMessageCursor();
}
}