You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/10/01 18:46:38 UTC

svn commit: r820713 - in /activemq/trunk/activemq-core: pom.xml src/main/java/org/apache/activemq/broker/ConnectionContext.java src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java src/main/java/org/apache/activemq/broker/region/Queue.java

Author: chirino
Date: Thu Oct  1 16:46:37 2009
New Revision: 820713

URL: http://svn.apache.org/viewvc?rev=820713&view=rev
Log:
AMQ-2435: NullPointer Exception Occurs when using producer flow control

 When producer window based flow control kicks in now, we copy the context since it will be
 changed while the message send request is waiting for space on the queue.


Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Thu Oct  1 16:46:37 2009
@@ -599,8 +599,8 @@
           <phase>process-classes</phase>
             <configuration>
               <namespace>http://activemq.apache.org/schema/core</namespace>
-              <schema>${basedir}/target/classes/activemq.xsd</schema>
-              <outputDir>${basedir}/target/classes</outputDir>
+              <schema>${basedir}/target/generated-sources/xbean/activemq.xsd</schema>
+              <outputDir>${basedir}/target/generated-sources/xbean</outputDir>
               <generateSpringSchemasFile>false</generateSpringSchemasFile>
               <excludedClasses>org.apache.activemq.broker.jmx.AnnotatedMBean,org.apache.activemq.broker.jmx.DestinationViewMBean</excludedClasses>
             </configuration>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java Thu Oct  1 16:46:37 2009
@@ -73,6 +73,32 @@
         setUserName(info.getUserName());
         setConnectionId(info.getConnectionId());
     }
+    
+    public ConnectionContext copy() {
+        ConnectionContext rc = new ConnectionContext(this.messageEvaluationContext);
+        rc.connection = this.connection;
+        rc.connector = this.connector;
+        rc.broker = this.broker;
+        rc.inRecoveryMode = this.inRecoveryMode;
+        rc.transaction = this.transaction;
+        rc.transactions = this.transactions;
+        rc.securityContext = this.securityContext;
+        rc.connectionId = this.connectionId;
+        rc.clientId = this.clientId;
+        rc.userName = this.userName;
+        rc.haAware = this.haAware;
+        rc.wireFormatInfo = this.wireFormatInfo;
+        rc.longTermStoreContext = this.longTermStoreContext;
+        rc.producerFlowControl = this.producerFlowControl;
+        rc.messageAuthorizationPolicy = this.messageAuthorizationPolicy;
+        rc.networkConnection = this.networkConnection;
+        rc.faultTolerant = this.faultTolerant;
+        rc.stopping.set(this.stopping.get());
+        rc.dontSendReponse = this.dontSendReponse;
+        rc.clientMaster = this.clientMaster;
+        return rc;
+    }
+
 
     public SecurityContext getSecurityContext() {
         return securityContext;
@@ -293,4 +319,5 @@
     public void setFaultTolerant(boolean faultTolerant) {
         this.faultTolerant = faultTolerant;
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java Thu Oct  1 16:46:37 2009
@@ -36,6 +36,17 @@
     public ProducerBrokerExchange() {
     }
 
+    public ProducerBrokerExchange copy() {
+        ProducerBrokerExchange rc = new ProducerBrokerExchange();
+        rc.connectionContext = connectionContext.copy();
+        rc.regionDestination = regionDestination;
+        rc.region = region;
+        rc.producerState = producerState;
+        rc.mutable = mutable;
+        return rc;
+    }
+
+    
     /**
      * @return the connectionContext
      */
@@ -105,4 +116,5 @@
     public void setProducerState(ProducerState producerState) {
         this.producerState = producerState;
     }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=820713&r1=820712&r2=820713&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Oct  1 16:46:37 2009
@@ -428,6 +428,9 @@
                 // a sync message or
                 // if it is using a producer window
                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
+                    // copy the exchange state since the context will be modified while we are waiting
+                    // for space.
+                    final ProducerBrokerExchange producerExchangeCopy = producerExchange.copy(); 
                     synchronized (messagesWaitingForSpace) {
                         messagesWaitingForSpace.add(new Runnable() {
                             public void run() {
@@ -439,7 +442,7 @@
                                         broker.messageExpired(context, message);
                                         destinationStatistics.getExpired().increment();
                                     } else {
-                                        doMessageSend(producerExchange, message);
+                                        doMessageSend(producerExchangeCopy, message);
                                     }
     
                                     if (sendProducerAck) {