You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/04/04 13:11:36 UTC

svn commit: r1088557 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/broker/region/policy/ test/java/org/apache/activemq/usecases/

Author: gtully
Date: Mon Apr  4 11:11:36 2011
New Revision: 1088557

URL: http://svn.apache.org/viewvc?rev=1088557&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3253 - Support Temporary Destinations in a network without advisories. Allow gc of regular destinations that have just network consumers, hub/spoke case with dynamic real reply destinations. policy entry: GcWithNetworkConsumers now applicable to all destinations. addition to https://issues.apache.org/jira/browse/AMQ-2571

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Mon Apr  4 11:11:36 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.regio
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import javax.jms.ResourceAllocationException;
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
@@ -89,6 +90,7 @@ public abstract class BaseDestination im
     private boolean prioritizedMessages;
     private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
     private boolean gcIfInactive;
+    private boolean gcWithNetworkConsumers;
     private long lastActiveTime=0l;
     private boolean reduceMemoryFootprint = false;
 
@@ -243,7 +245,12 @@ public abstract class BaseDestination im
     }
 
     public boolean isActive() {
-        return destinationStatistics.getConsumers().getCount() != 0 || destinationStatistics.getProducers().getCount() != 0;
+        boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
+                           destinationStatistics.getProducers().getCount() != 0;
+        if (isActive && isGcWithNetworkConsumers() && destinationStatistics.getConsumers().getCount() != 0) {
+            isActive = hasRegularConsumers(getConsumers());
+        }
+        return isActive;
     }
 
     public int getMaxPageSize() {
@@ -650,7 +657,19 @@ public abstract class BaseDestination im
     public void setGcIfInactive(boolean gcIfInactive) {
         this.gcIfInactive = gcIfInactive;
     }
-    
+
+    /**
+     * Indicate if it is ok to gc destinations that have only network consumers
+     * @param gcWithNetworkConsumers
+     */
+    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
+        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
+    }
+
+    public boolean isGcWithNetworkConsumers() {
+        return gcWithNetworkConsumers;
+    }
+
     public void markForGC(long timeStamp) {
         if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
                 && destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
@@ -676,7 +695,9 @@ public abstract class BaseDestination im
         return this.reduceMemoryFootprint;
     }
 
-   protected boolean hasRegularConsumers(Collection<Subscription> consumers) {
+    public abstract List<Subscription> getConsumers();
+
+    protected boolean hasRegularConsumers(List<Subscription> consumers) {
         boolean hasRegularConsumers = false;
         for (Subscription subscription: consumers) {
             if (!subscription.getConsumerInfo().isNetworkSubscription()) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Mon Apr  4 11:11:36 2011
@@ -90,15 +90,4 @@ public class TempQueue extends Queue{
         }
         super.dispose(context);
     }
-
-    @Override
-    public boolean isActive() {
-        boolean isActive = super.isActive();
-        if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
-            synchronized (consumers) {
-                isActive = hasRegularConsumers(consumers);
-            }
-        }
-        return isActive;
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopic.java Mon Apr  4 11:11:36 2011
@@ -68,15 +68,4 @@ public class TempTopic  extends Topic  i
     
     public void initialize() {
     }
-
-    @Override
-    public boolean isActive() {
-        boolean isActive = super.isActive();
-        if (isActive && brokerService.isAllowTempAutoCreationOnSend()) {
-            synchronized (consumers) {
-                isActive = hasRegularConsumers(consumers);
-            }
-        }
-        return isActive;
-    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Mon Apr  4 11:11:36 2011
@@ -90,10 +90,11 @@ public class PolicyEntry extends Destina
     private boolean prioritizedMessages;
     private boolean allConsumersExclusiveByDefault;
     private boolean gcInactiveDestinations;
+    private boolean gcWithNetworkConsumers;
     private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
     private boolean reduceMemoryFootprint;
-    
-   
+
+
     public void configure(Broker broker,Queue queue) {
         baseConfiguration(broker,queue);
         if (dispatchPolicy != null) {
@@ -163,6 +164,7 @@ public class PolicyEntry extends Destina
         destination.setSlowConsumerStrategy(scs);
         destination.setPrioritizedMessages(isPrioritizedMessages());
         destination.setGcIfInactive(isGcInactiveDestinations());
+        destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
         destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
         destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
     }
@@ -787,7 +789,15 @@ public class PolicyEntry extends Destina
     public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
         this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
     }
-    
+
+    public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
+        this.gcWithNetworkConsumers = gcWithNetworkConsumers;
+    }
+
+    public boolean isGcWithNetworkConsumers() {
+        return gcWithNetworkConsumers;
+    }
+
     public boolean isReduceMemoryFootprint() {
         return reduceMemoryFootprint;
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java?rev=1088557&r1=1088556&r2=1088557&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyNoAdvisoryNetworkTest.java Mon Apr  4 11:11:36 2011
@@ -264,6 +264,7 @@ public class RequestReplyNoAdvisoryNetwo
         PolicyEntry tempReplyQPolicy = new PolicyEntry();
         tempReplyQPolicy.setOptimizedDispatch(true);
         tempReplyQPolicy.setGcInactiveDestinations(true);
+        tempReplyQPolicy.setGcWithNetworkConsumers(true);
         tempReplyQPolicy.setInactiveTimoutBeforeGC(10*1000);
         map.put(replyQWildcard, tempReplyQPolicy);
         broker.setDestinationPolicy(map);