You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2007/03/05 17:26:13 UTC

svn commit: r514720 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker: ConnectionContext.java TransportConnection.java region/Queue.java region/Topic.java

Author: jlim
Date: Mon Mar  5 08:26:08 2007
New Revision: 514720

URL: http://svn.apache.org/viewvc?view=rev&rev=514720
Log:
ported fix to trunk : 
http://issues.apache.org/activemq/browse/AMQ-1181

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Mon Mar  5 08:26:08 2007
@@ -54,7 +54,7 @@
     private boolean producerFlowControl=true;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     private AtomicInteger referenceCounter = new AtomicInteger();
-    
+    private boolean networkConnection;
     private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
     
     public ConnectionContext() {
@@ -246,4 +246,11 @@
 		return referenceCounter.decrementAndGet();
 	}
 
+	public synchronized boolean isNetworkConnection() {
+		return networkConnection;
+	}
+
+	public synchronized void setNetworkConnection(boolean networkConnection) {
+		this.networkConnection = networkConnection;
+	}	
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Mar  5 08:26:08 2007
@@ -119,7 +119,8 @@
     private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
     private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
     protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
-
+    private boolean networkConnection;
+    
     static class ConnectionState extends org.apache.activemq.state.ConnectionState{
 
         private final ConnectionContext context;
@@ -627,6 +628,7 @@
         context.setUserName(info.getUserName());
         context.setConnectionId(info.getConnectionId());
         context.setWireFormatInfo(wireFormatInfo);
+        context.setNetworkConnection(networkConnection);
         context.incrementReference();
         this.manageable=info.isManageable();
         state=new ConnectionState(info,context,this);
@@ -1027,6 +1029,12 @@
         }
         this.brokerInfo=info;
         broker.addBroker(this,info);
+        networkConnection = true;
+        for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) {
+            ConnectionState cs = (ConnectionState) iter.next();
+            cs.getContext().setNetworkConnection(true);
+        }   
+        
         return null;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Mar  5 08:26:08 2007
@@ -325,7 +325,7 @@
             }
             return;
         }
-        if(context.isProducerFlowControl()){
+        if (context.isProducerFlowControl() && !context.isNetworkConnection()) {
             if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
             }else{

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Mar  5 08:26:08 2007
@@ -243,7 +243,7 @@
     	if( message.isExpired() ) {
     		return;
     	}
-        if (context.isProducerFlowControl()) {
+    	if (context.isProducerFlowControl()  && !context.isNetworkConnection() ) {
             if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
                 throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
             } else {