You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/08/15 16:29:28 UTC
svn commit: r686236 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region:
AbstractSubscription.java BaseDestination.java Queue.java Subscription.java
Topic.java policy/PolicyEntry.java
Author: rajdavies
Date: Fri Aug 15 07:29:27 2008
New Revision: 686236
URL: http://svn.apache.org/viewvc?rev=686236&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1878
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Fri Aug 15 07:29:27 2008
@@ -198,7 +198,7 @@
public void addDestination(Destination destination) {
}
-
+
/**
* Remove a destination
@@ -207,6 +207,10 @@
public void removeDestination(Destination destination) {
}
+
+ public int countBeforeFull() {
+ return getDispatchedQueueSize() - info.getPrefetchSize();
+ }
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
add(message);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Aug 15 07:29:27 2008
@@ -34,10 +34,10 @@
*/
public abstract class BaseDestination implements Destination {
/**
- * The default number of messages to page in to the destination
+ * The maximum number of messages to page in to the destination
* from persistent storage
*/
- public static final int DEFAULT_PAGE_SIZE=200;
+ public static final int MAX_PAGE_SIZE=200;
protected final ActiveMQDestination destination;
protected final Broker broker;
@@ -48,7 +48,7 @@
private int maxProducersToAudit=1024;
private int maxAuditDepth=2048;
private boolean enableAudit=true;
- private int maxPageSize=DEFAULT_PAGE_SIZE;
+ private int maxPageSize=MAX_PAGE_SIZE;
private boolean useCache=true;
private int minimumMessageSize=1024;
private boolean lazyDispatch=false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Aug 15 07:29:27 2008
@@ -228,7 +228,6 @@
// duplicates
// etc.
doPageIn(false);
-// msgContext.setDestination(destination);
synchronized (pagedInMessages) {
RecoveryDispatch rd = new RecoveryDispatch();
@@ -240,13 +239,17 @@
if( sub instanceof QueueBrowserSubscription ) {
((QueueBrowserSubscription)sub).incrementQueueRef();
}
-
+ if (!this.optimizedDispatch) {
+ wakeup();
+ }
}finally {
dispatchLock.unlock();
}
+ if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
- wakeup();
+ wakeup();
+ }
}
public void removeSubscription(ConnectionContext context, Subscription sub)
@@ -300,12 +303,17 @@
if (consumers.isEmpty()) {
messages.gc();
}
+ if (!this.optimizedDispatch) {
+ wakeup();
+ }
}finally {
dispatchLock.unlock();
}
- // Outside of dispatchLock() to maintain the lock hierarchy of
- // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
- wakeup();
+ if (this.optimizedDispatch) {
+ // Outside of dispatchLock() to maintain the lock hierarchy of
+ // iteratingMutex -> dispatchLock. - see https://issues.apache.org/activemq/browse/AMQ-1878
+ wakeup();
+ }
}
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
@@ -1099,6 +1107,7 @@
dispatchLock.lock();
try{
int toPageIn = (getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
+ toPageIn = Math.min(toPageIn,getMaxPageSize());
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn);
@@ -1142,7 +1151,7 @@
dispatchLock.lock();
try {
if(!pagedInPendingDispatch.isEmpty()) {
-// System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
+ // System.out.println(getName()+": dispatching from pending: "+pagedInPendingDispatch.size());
// Try to first dispatch anything that had not been dispatched before.
pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch);
// System.out.println(getName()+": new pending list1: "+pagedInPendingDispatch.size());
@@ -1237,9 +1246,8 @@
boolean zeroPrefetch = false;
synchronized (consumers) {
for (Subscription s : consumers) {
- PrefetchSubscription ps = (PrefetchSubscription) s;
- zeroPrefetch |= ps.getPrefetchSize() == 0;
- int countBeforeFull = ps.countBeforeFull();
+ zeroPrefetch |= s.getPrefetchSize() == 0;
+ int countBeforeFull = s.countBeforeFull();
total += countBeforeFull;
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java Fri Aug 15 07:29:27 2008
@@ -216,4 +216,9 @@
* @return true if a browser
*/
boolean isBrowser();
+
+ /**
+ * @return the number of messages this subscription can accept before its full
+ */
+ int countBeforeFull();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Aug 15 07:29:27 2008
@@ -220,11 +220,8 @@
if (subscription.matches(message, msgContext)) {
subscription.add(message);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
} catch (IOException e) {
- // TODO: Need to handle this better.
- e.printStackTrace();
+ LOG.error("Failed to recover this message " + message);
}
return true;
}
@@ -570,7 +567,6 @@
return;
}
}
-
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
msgContext.setDestination(destination);
msgContext.setMessageReference(message);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=686236&r1=686235&r2=686236&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Fri Aug 15 07:29:27 2008
@@ -58,7 +58,7 @@
private boolean enableAudit=true;
private boolean producerFlowControl = true;
private boolean optimizedDispatch=false;
- private int maxPageSize=100;
+ private int maxPageSize=BaseDestination.MAX_PAGE_SIZE;
private boolean useCache=true;
private long minimumMessageSize=1024;
private boolean useConsumerPriority=true;