You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/04/04 13:11:36 UTC
svn commit: r1088557 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/
test/java/org/apache/activemq/usecases/
Author: gtully
Date: Mon Apr 4 11:11:36 2011
New Revision: 1088557
URL: http://svn.apache.org/viewvc?rev=1088557&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations in a network without advisories. Allow gc of regular destinations that have just network consumers, hub/spoke case with dynamic real reply destinations. policy entry: GcWithNetworkConsumers now applicable to all destinations. addition to https://issues.apache.org/jira/browse/AMQ-2571
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Apr 4 11:11:36 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
@@ -89,6 +90,7 @@ public abstract class BaseDestination im
private boolean prioritizedMessages;
private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean gcIfInactive;
+ private boolean gcWithNetworkConsumers;
private long lastActiveTime=0l;
private boolean reduceMemoryFootprint = false;
@@ -243,7 +245,12 @@ public abstract class BaseDestination im
}
public boolean isActive() {
- return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
+ boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
+ destinationStatistics.getProducers().getCount() != 0;
+ if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
+ isActive = hasRegularConsumers(getConsumers());
+ }
+ return isActive;
}
public int getMaxPageSize() {
@@ -650,7 +657,19 @@ public abstract class BaseDestination im
public void setGcIfInactive(boolean gcIfInactive) {
this.gcIfInactive = gcIfInactive;
}
-
+
+ /**
+ * Indicate if it is ok to gc destinations that have only network consumers
+ * @param gcWithNetworkConsumers
+ */
+ public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
+ this.gcWithNetworkConsumers = gcWithNetworkConsumers;
+ }
+
+ public boolean isGcWithNetworkConsumers() {
+ return gcWithNetworkConsumers;
+ }
+
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
@@ -676,7 +695,9 @@ public abstract class BaseDestination im
return this.reduceMemoryFootprint;
}
- protected boolean hasRegularConsumers(Collection<Subscription> consumers) {
+ public abstract List<Subscription> getConsumers();
+
+ protected boolean hasRegularConsumers(List<Subscription> consumers) {
boolean hasRegularConsumers = false;
for (Subscription subscription: consumers) {
if (!subscription.getConsumerInfo().isNetworkSubscription()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Mon Apr 4 11:11:36 2011
@@ -90,15 +90,4 @@ public class TempQueue extends Queue{
}
super.dispose(context);
}
-
- @Override
- public boolean isActive() {
- boolean isActive = super.isActive();
- if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
- synchronized (consumers) {
- isActive = hasRegularConsumers(consumers);
- }
- }
- return isActive;
- }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java Mon Apr 4 11:11:36 2011
@@ -68,15 +68,4 @@ public class TempTopic extends Topic i
public void initialize() {
}
-
- @Override
- public boolean isActive() {
- boolean isActive = super.isActive();
- if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
- synchronized (consumers) {
- isActive = hasRegularConsumers(consumers);
- }
- }
- return isActive;
- }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Apr 4 11:11:36 2011
@@ -90,10 +90,11 @@ public class PolicyEntry extends Destina
private boolean prioritizedMessages;
private boolean allConsumersExclusiveByDefault;
private boolean gcInactiveDestinations;
+ private boolean gcWithNetworkConsumers;
private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean reduceMemoryFootprint;
-
-
+
+
public void configure(Broker broker,Queue queue) {
baseConfiguration(broker,queue);
if (dispatchPolicy != null) {
@@ -163,6 +164,7 @@ public class PolicyEntry extends Destina
destination.setSlowConsumerStrategy(scs);
destination.setPrioritizedMessages(isPrioritizedMessages());
destination.setGcIfInactive(isGcInactiveDestinations());
+ destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
}
@@ -787,7 +789,15 @@ public class PolicyEntry extends Destina
public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
}
-
+
+ public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
+ this.gcWithNetworkConsumers = gcWithNetworkConsumers;
+ }
+
+ public boolean isGcWithNetworkConsumers() {
+ return gcWithNetworkConsumers;
+ }
+
public boolean isReduceMemoryFootprint() {
return reduceMemoryFootprint;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java Mon Apr 4 11:11:36 2011
@@ -264,6 +264,7 @@ public class RequestReplyNoAdvisoryNetwo
PolicyEntry tempReplyQPolicy = new PolicyEntry();
tempReplyQPolicy.setOptimizedDispatch(true);
tempReplyQPolicy.setGcInactiveDestinations(true);
+ tempReplyQPolicy.setGcWithNetworkConsumers(true);
tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
map.put(replyQWildcard, tempReplyQPolicy);
broker.setDestinationPolicy(map);