You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jl...@apache.org on 2007/03/05 17:26:13 UTC
svn commit: r514720 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker:
ConnectionContext.java TransportConnection.java region/Queue.java
region/Topic.java
Author: jlim
Date: Mon Mar 5 08:26:08 2007
New Revision: 514720
URL: http://svn.apache.org/viewvc?view=rev&rev=514720
Log:
ported fix to trunk :
http://issues.apache.org/activemq/browse/AMQ-1181
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Mon Mar 5 08:26:08 2007
@@ -54,7 +54,7 @@
private boolean producerFlowControl=true;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private AtomicInteger referenceCounter = new AtomicInteger();
-
+ private boolean networkConnection;
private final MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
public ConnectionContext() {
@@ -246,4 +246,11 @@
return referenceCounter.decrementAndGet();
}
+ public synchronized boolean isNetworkConnection() {
+ return networkConnection;
+ }
+
+ public synchronized void setNetworkConnection(boolean networkConnection) {
+ this.networkConnection = networkConnection;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Mar 5 08:26:08 2007
@@ -119,7 +119,8 @@
private final Map<ConsumerId,ConsumerBrokerExchange>consumerExchanges = new HashMap<ConsumerId,ConsumerBrokerExchange>();
private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
protected AtomicBoolean dispatchStopped=new AtomicBoolean(false);
-
+ private boolean networkConnection;
+
static class ConnectionState extends org.apache.activemq.state.ConnectionState{
private final ConnectionContext context;
@@ -627,6 +628,7 @@
context.setUserName(info.getUserName());
context.setConnectionId(info.getConnectionId());
context.setWireFormatInfo(wireFormatInfo);
+ context.setNetworkConnection(networkConnection);
context.incrementReference();
this.manageable=info.isManageable();
state=new ConnectionState(info,context,this);
@@ -1027,6 +1029,12 @@
}
this.brokerInfo=info;
broker.addBroker(this,info);
+ networkConnection = true;
+ for (Iterator iter = localConnectionStates.values().iterator(); iter.hasNext();) {
+ ConnectionState cs = (ConnectionState) iter.next();
+ cs.getContext().setNetworkConnection(true);
+ }
+
return null;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Mon Mar 5 08:26:08 2007
@@ -325,7 +325,7 @@
}
return;
}
- if(context.isProducerFlowControl()){
+ if (context.isProducerFlowControl() && !context.isNetworkConnection()) {
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}else{
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=514720&r1=514719&r2=514720
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Mon Mar 5 08:26:08 2007
@@ -243,7 +243,7 @@
if( message.isExpired() ) {
return;
}
- if (context.isProducerFlowControl()) {
+ if (context.isProducerFlowControl() && !context.isNetworkConnection() ) {
if (usageManager.isSendFailIfNoSpace() && usageManager.isFull()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
} else {