You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/06/27 17:49:55 UTC

svn commit: r551207 [1/2] - in /incubator/qpid/trunk/qpid: ./ java/ java/broker/etc/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/exchange/ java/broker/src/main/java/org/apache/qpid/server/protocol/...

Author: ritchiem
Date: Wed Jun 27 08:49:51 2007
New Revision: 551207

URL: http://svn.apache.org/viewvc?view=rev&rev=551207
Log:
Merged revisions 550748-551121 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2

........
  r550748 | ritchiem | 2007-06-26 10:20:17 +0100 (Tue, 26 Jun 2007) | 1 line
  
  Added xml file for logging during sustained tests
........
  r550773 | rupertlssmith | 2007-06-26 12:03:04 +0100 (Tue, 26 Jun 2007) | 1 line
  
  Immediate and mandatory flag tests added.
........
  r550849 | rupertlssmith | 2007-06-26 17:43:58 +0100 (Tue, 26 Jun 2007) | 1 line
  
  QPID-509 Mandatory messages not returned outside a transaction. They are now.
........
  r551117 | ritchiem | 2007-06-27 11:51:34 +0100 (Wed, 27 Jun 2007) | 2 lines
  
  Update to the sustained test to ensure late joining occurs correctly and improved stabilisation. Additional system properties now documented on wiki.
  http://cwiki.apache.org/qpid/sustained-tests.html
........
  r551118 | ritchiem | 2007-06-27 11:51:51 +0100 (Wed, 27 Jun 2007) | 1 line
  
  Added intelij files to ignore list
........
  r551119 | ritchiem | 2007-06-27 11:55:34 +0100 (Wed, 27 Jun 2007) | 1 line
  
  POM update to add Apache content to built jars
........
  r551120 | ritchiem | 2007-06-27 11:58:25 +0100 (Wed, 27 Jun 2007) | 1 line
  
  Updated default guest password as it was not correct.
........
  r551121 | ritchiem | 2007-06-27 12:00:48 +0100 (Wed, 27 Jun 2007) | 1 line
  
  Added additional information to log message when available to aid the explination of a failed connection
........

Added:
    incubator/qpid/trunk/qpid/java/integrationtests/src/resources/sustained-log4j.xml
      - copied unchanged from r551121, incubator/qpid/branches/M2/java/integrationtests/src/resources/sustained-log4j.xml
Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/java/broker/etc/md5passwd
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
    incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
    incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
    incubator/qpid/trunk/qpid/java/management/eclipse-plugin/   (props changed)
    incubator/qpid/trunk/qpid/java/pom.xml
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ImmediateMessageTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MandatoryMessageTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/MessagingTestConfigProperties.java

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/java/broker/etc/md5passwd
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/etc/md5passwd?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/etc/md5passwd (original)
+++ incubator/qpid/trunk/qpid/java/broker/etc/md5passwd Wed Jun 27 08:49:51 2007
@@ -16,6 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-guest:qfgyy4ewnVMBg
+guest:CE4DQ6BIb/BVMN9scFyLtA==
 admin:ISMvKXpXpadDiUoOSoAfww==
 user:aBzonUodYLhwSa8s9A10sA==

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed Jun 27 08:49:51 2007
@@ -20,18 +20,9 @@
  */
 package org.apache.qpid.server;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.log4j.Logger;
 
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ContentBody;
@@ -52,6 +43,16 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.*;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class AMQChannel
 {
     public static final int DEFAULT_PREFETCH = 5000;
@@ -208,7 +209,8 @@
         _currentMessage.setPublisher(publisher);
     }
 
-    public void publishContentHeader(ContentHeaderBody contentHeaderBody) throws AMQException
+    public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession)
+        throws AMQException
     {
         if (_currentMessage == null)
         {
@@ -230,6 +232,7 @@
             // check and deliver if header says body length is zero
             if (contentHeaderBody.bodySize == 0)
             {
+                _txnContext.messageProcessed(protocolSession);
                 _currentMessage = null;
             }
         }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Wed Jun 27 08:49:51 2007
@@ -169,7 +169,7 @@
         if (queues == null || queues.isEmpty())
         {
             String msg = "Routing key " + routingKey + " is not known to " + this;
-            if (info.isMandatory())
+            if (info.isMandatory() || info.isImmediate())
             {
                 throw new NoRouteException(msg, payload, null);
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Wed Jun 27 08:49:51 2007
@@ -20,13 +20,18 @@
  */
 package org.apache.qpid.server.exchange;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -41,24 +46,21 @@
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class DestWildExchange extends AbstractExchange
 {
     private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
 
-    private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
-    //    private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
+    private ConcurrentHashMap<AMQShortString, List<AMQQueue>> _routingKey2queues =
+        new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    // private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
     private static final String TOPIC_SEPARATOR = ".";
     private static final String AMQP_STAR = "*";
     private static final String AMQP_HASH = "#";
@@ -90,7 +92,7 @@
                     queueList.add(q.getName().toString());
                 }
 
-                Object[] bindingItemValues = {key.toString(), queueList.toArray(new String[0])};
+                Object[] bindingItemValues = { key.toString(), queueList.toArray(new String[0]) };
                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
                 _bindingList.put(bindingData);
             }
@@ -118,7 +120,6 @@
 
     } // End of MBean class
 
-
     public AMQShortString getType()
     {
         return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
@@ -140,6 +141,7 @@
         {
             queueList = _routingKey2queues.get(routingKey);
         }
+
         if (!queueList.contains(queue))
         {
             queueList.add(queue);
@@ -165,8 +167,8 @@
 
         for (int index = 0; index < size; index++)
         {
-            //if there are more levels
-            if (index + 1 < size)
+            // if there are more levels
+            if ((index + 1) < size)
             {
                 if (_subscription.get(index).equals(AMQP_HASH))
                 {
@@ -175,7 +177,7 @@
                         // we don't need #.# delete this one
                         _subscription.remove(index);
                         size--;
-                        //redo this normalisation
+                        // redo this normalisation
                         index--;
                     }
 
@@ -186,7 +188,7 @@
                         _subscription.add(index + 1, _subscription.remove(index));
                     }
                 }
-            }//if we have more levels
+            } // if we have more levels
         }
 
         StringBuilder sb = new StringBuilder();
@@ -211,9 +213,9 @@
         List<AMQQueue> queues = getMatchedQueues(routingKey);
         // if we have no registered queues we have nothing to do
         // TODO: add support for the immediate flag
-        if (queues == null || queues.size() == 0)
+        if ((queues == null) || queues.isEmpty())
         {
-            if (info.isMandatory())
+            if (info.isMandatory() || info.isImmediate())
             {
                 String msg = "Topic " + routingKey + " is not known to " + this;
                 throw new NoRouteException(msg, payload, null);
@@ -222,6 +224,7 @@
             {
                 _logger.warn("No queues found for routing key " + routingKey);
                 _logger.warn("Routing map contains: " + _routingKey2queues);
+
                 return;
             }
         }
@@ -238,14 +241,15 @@
     public boolean isBound(AMQShortString routingKey, AMQQueue queue) throws AMQException
     {
         List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
-        return queues != null && queues.contains(queue);
-    }
 
+        return (queues != null) && queues.contains(queue);
+    }
 
     public boolean isBound(AMQShortString routingKey) throws AMQException
     {
         List<AMQQueue> queues = _routingKey2queues.get(normalize(routingKey));
-        return queues != null && !queues.isEmpty();
+
+        return (queues != null) && !queues.isEmpty();
     }
 
     public boolean isBound(AMQQueue queue) throws AMQException
@@ -257,6 +261,7 @@
                 return true;
             }
         }
+
         return false;
     }
 
@@ -279,12 +284,14 @@
                                    " with routing key " + routingKey + ". No queue was registered with that routing key", null);
 
         }
+
         boolean removedQ = queues.remove(queue);
         if (!removedQ)
         {
             throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
                                    " with routing key " + routingKey, null);
         }
+
         if (queues.isEmpty())
         {
             _routingKey2queues.remove(routingKey);
@@ -304,7 +311,6 @@
         }
     }
 
-
     private List<AMQQueue> getMatchedQueues(AMQShortString routingKey)
     {
         List<AMQQueue> list = new LinkedList<AMQQueue>();
@@ -334,7 +340,6 @@
                 queueList.add(queTok.nextToken());
             }
 
-
             int depth = 0;
             boolean matching = true;
             boolean done = false;
@@ -343,25 +348,26 @@
 
             while (matching && !done)
             {
-                if (queueList.size() == depth + queueskip || routingkeyList.size() == depth + routingskip)
+                if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip)))
                 {
                     done = true;
 
                     // if it was the routing key that ran out of digits
-                    if (routingkeyList.size() == depth + routingskip)
+                    if (routingkeyList.size() == (depth + routingskip))
                     {
                         if (queueList.size() > (depth + queueskip))
-                        {            // a hash and it is the last entry
-                            matching = queueList.get(depth + queueskip).equals(AMQP_HASH) && queueList.size() == depth + queueskip + 1;
+                        { // a hash and it is the last entry
+                            matching =
+                                queueList.get(depth + queueskip).equals(AMQP_HASH)
+                                && (queueList.size() == (depth + queueskip + 1));
                         }
                     }
-                    else if (routingkeyList.size() > depth + routingskip)
+                    else if (routingkeyList.size() > (depth + routingskip))
                     {
                         // There is still more routing key to check
                         matching = false;
                     }
 
-
                     continue;
                 }
 
@@ -377,27 +383,33 @@
                     else if (queueList.get(depth + queueskip).equals(AMQP_HASH))
                     {
                         // Is this a # at the end
-                        if (queueList.size() == depth + queueskip + 1)
+                        if (queueList.size() == (depth + queueskip + 1))
                         {
                             done = true;
+
                             continue;
                         }
 
                         // otherwise # in the middle
-                        while (routingkeyList.size() > depth + routingskip)
+                        while (routingkeyList.size() > (depth + routingskip))
                         {
                             if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1)))
                             {
                                 queueskip++;
                                 depth++;
+
                                 break;
                             }
+
                             routingskip++;
                         }
+
                         continue;
                     }
+
                     matching = false;
                 }
+
                 depth++;
             }
 

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Wed Jun 27 08:49:51 2007
@@ -1,27 +1,36 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  *
  */
-
 package org.apache.qpid.server.exchange;
 
-import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
 
 import javax.management.JMException;
 import javax.management.MBeanException;
@@ -36,16 +45,7 @@
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.server.management.MBeanConstructor;
-import org.apache.qpid.server.management.MBeanDescription;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
 
 public class FanoutExchange extends AbstractExchange
 {
@@ -63,7 +63,7 @@
     private final class FanoutExchangeMBean extends ExchangeMBean
     {
         @MBeanConstructor("Creates an MBean for AMQ fanout exchange")
-        public FanoutExchangeMBean()  throws JMException
+        public FanoutExchangeMBean() throws JMException
         {
             super();
             _exchangeType = "fanout";
@@ -79,9 +79,7 @@
             {
                 String queueName = queue.getName().toString();
 
-
-
-                Object[] bindingItemValues = {queueName, new String[] {queueName}};
+                Object[] bindingItemValues = { queueName, new String[] { queueName } };
                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
                 _bindingList.put(bindingData);
             }
@@ -98,7 +96,7 @@
             }
 
             try
-            {                
+            {
                 queue.bind(new AMQShortString(binding), null, FanoutExchange.this);
             }
             catch (AMQException ex)
@@ -107,8 +105,7 @@
             }
         }
 
-    }// End of MBean class
-
+    } // End of MBean class
 
     protected ExchangeMBean createMBean() throws AMQException
     {
@@ -147,7 +144,6 @@
     {
         assert queue != null;
 
-
         if (!_queues.remove(queue))
         {
             throw new AMQException(null, "Queue " + queue + " was not registered with exchange " + this.getName() +
@@ -159,10 +155,10 @@
     {
         final MessagePublishInfo publishInfo = payload.getMessagePublishInfo();
         final AMQShortString routingKey = publishInfo.getRoutingKey();
-        if (_queues == null || _queues.isEmpty())
+        if ((_queues == null) || _queues.isEmpty())
         {
             String msg = "No queues bound to " + this;
-            if (publishInfo.isMandatory())
+            if (publishInfo.isMandatory() || publishInfo.isImmediate())
             {
                 throw new NoRouteException(msg, payload, null);
             }
@@ -193,12 +189,11 @@
     public boolean isBound(AMQShortString routingKey) throws AMQException
     {
 
-        return _queues != null && !_queues.isEmpty();
+        return (_queues != null) && !_queues.isEmpty();
     }
 
     public boolean isBound(AMQQueue queue) throws AMQException
     {
-
 
         return _queues.contains(queue);
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Jun 27 08:49:51 2007
@@ -229,7 +229,7 @@
 
             String msg = "Exchange " + getName() + ": message not routable.";
 
-            if (payload.getMessagePublishInfo().isMandatory())
+            if (payload.getMessagePublishInfo().isMandatory() || payload.getMessagePublishInfo().isImmediate())
             {
                 throw new NoRouteException(msg, payload, null);
             }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Wed Jun 27 08:49:51 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,24 +20,13 @@
  */
 package org.apache.qpid.server.protocol;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.security.Principal;
-
-import javax.management.JMException;
-import javax.security.sasl.SaslServer;
-
 import org.apache.log4j.Logger;
+
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
+
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
@@ -46,22 +35,34 @@
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.AMQState;
+import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 
-public class AMQMinaProtocolSession implements AMQProtocolSession,
-                                               Managable
+import javax.management.JMException;
+import javax.security.sasl.SaslServer;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+public class AMQMinaProtocolSession implements AMQProtocolSession, Managable
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolSession.class);
 
@@ -111,25 +112,20 @@
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
 
-
     public ManagedObject getManagedObject()
     {
         return _managedObject;
     }
 
-
-    public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
-                                  AMQCodecFactory codecFactory)
-            throws AMQException
+    public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory)
+        throws AMQException
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _minaProtocolSession = session;
         session.setAttachment(this);
 
-
         _codecFactory = codecFactory;
 
-
         try
         {
             IoServiceConfig config = session.getServiceConfig();
@@ -140,16 +136,15 @@
         catch (RuntimeException e)
         {
             e.printStackTrace();
-            //    throw e;
+            // throw e;
 
         }
 
-//        this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
+        // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager());
     }
 
-    public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry,
-                                  AMQCodecFactory codecFactory, AMQStateManager stateManager)
-            throws AMQException
+    public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory,
+        AMQStateManager stateManager) throws AMQException
     {
         _stateManager = stateManager;
         _minaProtocolSession = session;
@@ -182,8 +177,7 @@
         return (AMQProtocolSession) minaProtocolSession.getAttachment();
     }
 
-    public void dataBlockReceived(AMQDataBlock message)
-            throws Exception
+    public void dataBlockReceived(AMQDataBlock message) throws Exception
     {
         _lastReceived = message;
         if (message instanceof ProtocolInitiation)
@@ -203,8 +197,7 @@
         }
     }
 
-    private void frameReceived(AMQFrame frame)
-            throws AMQException
+    private void frameReceived(AMQFrame frame) throws AMQException
     {
         int channelId = frame.getChannel();
         AMQBody body = frame.getBodyFrame();
@@ -252,13 +245,13 @@
             String locales = "en_US";
 
             // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
-            AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
-                                                                   getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                   locales.getBytes(),    // locales
-                                                                   mechanisms.getBytes(),    // mechanisms
-                                                                   null,    // serverProperties
-                                                                   (short) getProtocolMajorVersion(),    // versionMajor
-                                                                   (short) getProtocolMinorVersion());    // versionMinor
+            AMQFrame response =
+                ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+                    locales.getBytes(), // locales
+                    mechanisms.getBytes(), // mechanisms
+                    null, // serverProperties
+                    (short) getProtocolMajorVersion(), // versionMajor
+                    (short) getProtocolMinorVersion()); // versionMinor
             _minaProtocolSession.write(response);
         }
         catch (AMQException e)
@@ -269,21 +262,19 @@
 
             // TODO: Close connection (but how to wait until message is sent?)
             // ritchiem 2006-12-04 will this not do?
-//                WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
-//                future.join();
-//                close connection
+            // WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOLgetProtocolMajorVersion()], pv[i][PROTOCOLgetProtocolMinorVersion()]));
+            // future.join();
+            // close connection
 
         }
     }
 
-
     private void methodFrameReceived(int channelId, AMQMethodBody methodBody)
     {
 
-        final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId,
-                                                                                    methodBody);
+        final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(channelId, methodBody);
 
-        //Check that this channel is not closing
+        // Check that this channel is not closing
         if (channelAwaitingClosure(channelId))
         {
             if ((evt.getMethod() instanceof ChannelCloseOkBody))
@@ -299,11 +290,11 @@
                 {
                     _logger.info("Channel[" + channelId + "] awaiting closure ignoring");
                 }
+
                 return;
             }
         }
 
-
         try
         {
             try
@@ -315,10 +306,10 @@
                 {
                     for (AMQMethodListener listener : _frameListeners)
                     {
-                        wasAnyoneInterested = listener.methodReceived(evt) ||
-                                              wasAnyoneInterested;
+                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
                     }
                 }
+
                 if (!wasAnyoneInterested)
                 {
                     throw new AMQNoMethodHandlerException(evt, null);
@@ -332,6 +323,7 @@
                     {
                         _logger.info("Closing channel due to: " + e.getMessage());
                     }
+
                     writeFrame(e.getCloseFrame(channelId));
                     closeChannel(channelId);
                 }
@@ -341,14 +333,17 @@
                     {
                         _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
                     }
+
                     if (_logger.isInfoEnabled())
                     {
                         _logger.info("Closing connection due to: " + e.getMessage());
                     }
+
                     closeSession();
 
-                    AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
-                                                                                       AMQConstant.CHANNEL_ERROR.getName().toString());
+                    AMQConnectionException ce =
+                        evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+                            AMQConstant.CHANNEL_ERROR.getName().toString());
 
                     _stateManager.changeState(AMQState.CONNECTION_CLOSING);
                     writeFrame(ce.getCloseFrame(channelId));
@@ -360,6 +355,7 @@
                 {
                     _logger.info("Closing connection due to: " + e.getMessage());
                 }
+
                 closeSession();
                 _stateManager.changeState(AMQState.CONNECTION_CLOSING);
                 writeFrame(e.getCloseFrame(channelId));
@@ -372,17 +368,17 @@
             {
                 listener.error(e);
             }
+
             _minaProtocolSession.close();
         }
     }
 
-
     private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
     {
 
         AMQChannel channel = getAndAssertChannel(channelId);
 
-        channel.publishContentHeader(body);
+        channel.publishContentHeader(body, this);
 
     }
 
@@ -427,15 +423,15 @@
         {
             throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId, null);
         }
+
         return channel;
     }
 
     public AMQChannel getChannel(int channelId) throws AMQException
     {
-        final AMQChannel channel = ((channelId & CHANNEL_CACHE_SIZE) == channelId)
-                   ? _cachedChannels[channelId]
-                   : _channelMap.get(channelId);
-        if (channel == null || channel.isClosing())
+        final AMQChannel channel =
+            ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
+        if ((channel == null) || channel.isClosing())
         {
             return null;
         }
@@ -466,8 +462,9 @@
 
         if (_channelMap.size() == _maxNoOfChannels)
         {
-            String errorMessage = toString() + ": maximum number of channels has been reached (" +
-                                  _maxNoOfChannels + "); can't create channel";
+            String errorMessage =
+                toString() + ": maximum number of channels has been reached (" + _maxNoOfChannels
+                + "); can't create channel";
             _logger.error(errorMessage);
             throw new AMQException(AMQConstant.NOT_ALLOWED, errorMessage, null);
         }
@@ -480,6 +477,7 @@
         {
             _cachedChannels[channelId] = channel;
         }
+
         checkForNotification();
     }
 
@@ -504,7 +502,7 @@
 
     public void commitTransactions(AMQChannel channel) throws AMQException
     {
-        if (channel != null && channel.isTransactional())
+        if ((channel != null) && channel.isTransactional())
         {
             channel.commit();
         }
@@ -512,7 +510,7 @@
 
     public void rollbackTransactions(AMQChannel channel) throws AMQException
     {
-        if (channel != null && channel.isTransactional())
+        if ((channel != null) && channel.isTransactional())
         {
             channel.rollback();
         }
@@ -597,6 +595,7 @@
         {
             channel.close(this);
         }
+
         _channelMap.clear();
         for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
         {
@@ -615,6 +614,7 @@
             {
                 _managedObject.unregister();
             }
+
             for (Task task : _taskList)
             {
                 task.doTask(this);
@@ -687,6 +687,7 @@
             {
                 setContextKey(new AMQShortString(_clientProperties.getString(CLIENT_PROPERTIES_INSTANCE)));
             }
+
             if (_clientProperties.getString(ClientProperties.version.toString()) != null)
             {
                 _clientVersion = new AMQShortString(_clientProperties.getString(ClientProperties.version.toString()));
@@ -715,7 +716,7 @@
 
     public boolean isProtocolVersion(byte major, byte minor)
     {
-        return getProtocolMajorVersion() == major && getProtocolMinorVersion() == minor;
+        return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
     }
 
     public VersionSpecificRegistry getRegistry()
@@ -723,13 +724,11 @@
         return _registry;
     }
 
-
     public Object getClientIdentifier()
     {
         return _minaProtocolSession.getRemoteAddress();
     }
 
-
     public VirtualHost getVirtualHost()
     {
         return _virtualHost;
@@ -769,6 +768,6 @@
 
     public String getClientVersion()
     {
-        return _clientVersion == null ? null : _clientVersion.toString();
+        return (_clientVersion == null) ? null : _clientVersion.toString();
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Jun 27 08:49:51 2007
@@ -49,6 +49,16 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import javax.management.JMException;
+
+import java.text.MessageFormat;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * This is an AMQ Queue, and should not be confused with a JMS queue or any other abstraction like that. It is described
  * fully in RFC 006.
@@ -607,7 +617,7 @@
         delete();
     }
 
-    public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+    /*public void processGet(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
     {
         // fixme not sure what this is doing. should we be passing deliverFirst through here?
         // This code is not used so when it is perhaps it should
@@ -623,7 +633,7 @@
             // from the queue:
             dequeue(storeContext, msg);
         }
-    }
+    }*/
 
     // public DeliveryManager getDeliveryManager()
     // {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Jun 27 08:49:51 2007
@@ -337,7 +337,14 @@
 
             if ((message == null) || message.equals(""))
             {
-                message = "Unable to Connect";
+                if (message == null)
+                {
+                    message = "Unable to Connect";
+                }
+                else // can only be "" if getMessage() returned it therfore lastException != null
+                {
+                    message = "Unable to Connect:" + lastException.getClass();
+                }
             }
 
             AMQException e = new AMQConnectionFailureException(message, null);

Modified: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java (original)
+++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/interop/testclient/TestClient.java Wed Jun 27 08:49:51 2007
@@ -252,7 +252,8 @@
     public static Connection createConnection(String connectionPropsResource, String clientID, String brokerUrl, String virtualHost)
     {
         log.debug("public static Connection createConnection(String connectionPropsResource = " + connectionPropsResource
-                  + ", String brokerUrl = " + brokerUrl + ", String virtualHost = " + virtualHost + "): called");
+                  + ", String brokerUrl = " + brokerUrl + ", String clientID = " + clientID
+                  + ", String virtualHost = " + virtualHost + " ): called");
 
         try
         {

Modified: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java (original)
+++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestClient.java Wed Jun 27 08:49:51 2007
@@ -37,6 +37,7 @@
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -91,7 +92,9 @@
 
 
     private static final long TEN_MILLI_SEC = 10000000;
-    private static final long FIVE_MILLI_SEC = 5000000;
+    private static final int DEBUG_LOG_UPATE_INTERVAL = 10;
+    private static final int LOG_UPATE_INTERVAL = 10;
+    private static final boolean SLEEP_PER_MESSAGE = Boolean.getBoolean("sleepPerMessage");
 
     /**
      * Should provide the name of the test case that this class implements. The exact names are defined in the interop
@@ -129,6 +132,7 @@
         String sendKey = assignRoleMessage.getStringProperty("SUSTAINED_KEY");
         String sendUpdateKey = assignRoleMessage.getStringProperty("SUSTAINED_UPDATE_KEY");
         int ackMode = assignRoleMessage.getIntProperty("ACKNOWLEDGE_MODE");
+        String clientName = assignRoleMessage.getStringProperty("CLIENT_NAME");
 
         if (debugLog.isDebugEnabled())
         {
@@ -150,7 +154,9 @@
                 session = new Session[1];
 
                 connection[0] =
-                        org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE, org.apache.qpid.interop.testclient.TestClient.brokerUrl,
+                        org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+                                                                                       clientName,
+                                                                                       org.apache.qpid.interop.testclient.TestClient.brokerUrl,
                                                                                        org.apache.qpid.interop.testclient.TestClient.virtualHost);
                 session[0] = connection[0].createSession(false, ackMode);
 
@@ -182,6 +188,7 @@
                 {
                     connection[i] =
                             org.apache.qpid.interop.testclient.TestClient.createConnection(org.apache.qpid.interop.testclient.TestClient.DEFAULT_CONNECTION_PROPS_RESOURCE,
+                                                                                           clientName,
                                                                                            org.apache.qpid.interop.testclient.TestClient.brokerUrl,
                                                                                            org.apache.qpid.interop.testclient.TestClient.virtualHost);
                     session[i] = connection[i].createSession(false, ackMode);
@@ -192,7 +199,7 @@
 
                     MessageConsumer consumer = session[i].createConsumer(sendDestination);
 
-                    consumer.setMessageListener(new SustainedListener(TestClient.CLIENT_NAME + "-" + i, _batchSize, session[i], sendUpdateDestination));
+                    consumer.setMessageListener(new SustainedListener(clientName + "-" + i, _batchSize, session[i], sendUpdateDestination));
                 }
 
                 break;
@@ -347,7 +354,7 @@
                     _received++;
                     if (((TextMessage) message).getText().equals("start"))
                     {
-                        debugLog.info("Starting Batch");
+                        debugLog.debug("Starting Batch");
                         _startTime = System.nanoTime();
                     }
                     else if (((TextMessage) message).getText().equals("end"))
@@ -355,8 +362,8 @@
                         if (_startTime != null)
                         {
                             long currentTime = System.nanoTime();
-                            sendStatus(currentTime - _startTime, _received);
-                            debugLog.info("End Batch");
+                            sendStatus(currentTime - _startTime, _received, message.getIntProperty("BATCH"));
+                            debugLog.debug("End Batch");
                         }
                     }
                 }
@@ -373,28 +380,31 @@
          *
          * @param time     taken for the the last batch
          * @param received Total number of messages received.
-         *
+         * @param batchNumber the batch number
          * @throws JMSException if an error occurs during the send
          */
-        private void sendStatus(long time, long received) throws JMSException
+        private void sendStatus(long time, long received, int batchNumber) throws JMSException
         {
             Message updateMessage = _session.createTextMessage("update");
             updateMessage.setStringProperty("CLIENT_ID", ":" + _client);
             updateMessage.setStringProperty("CONTROL_TYPE", "UPDATE");
             updateMessage.setLongProperty("RECEIVED", received);
+            updateMessage.setIntProperty("BATCH", batchNumber);
             updateMessage.setLongProperty("DURATION", time);
 
             if (debugLog.isInfoEnabled())
             {
-                debugLog.info("**** SENDING [" + received / _batchSize + "]**** "
-                              + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+                debugLog.info("**** SENDING [" + batchNumber + "]**** "
+                              + "CLIENT_ID:" + _client + " RECEIVED:" + received
+                              + " BATCH:" + batchNumber + " DURATION:" + time);
             }
 
             // Output on the main log.info the details of this batch
-            if (received / _batchSize % 10 == 0)
+            if (batchNumber % 10 == 0)
             {
-                log.info("Sending Report [" + received / _batchSize + "] "
-                         + "CLIENT_ID:" + _client + " RECEIVED:" + received + " DURATION:" + time);
+                log.info("Sending Report [" + batchNumber + "] "
+                         + "CLIENT_ID:" + _client + " RECEIVED:" + received
+                         + " BATCH:" + batchNumber + " DURATION:" + time);
             }
 
             _updater.send(updateMessage);
@@ -415,7 +425,7 @@
     class SustainedRateAdapter implements MessageListener, Runnable
     {
         private SustainedTestClient _client;
-        private long _messageVariance = 500; //no. messages to allow drifting
+        private long _batchVariance = 3; //no. batches to allow drifting
         private long _timeVariance = TEN_MILLI_SEC * 5; // no. nanos between send and report delay (10ms)
         private volatile long _delay;   //in nanos
         private long _sent;
@@ -451,18 +461,23 @@
                     long duration = message.getLongProperty("DURATION");
                     long totalReceived = message.getLongProperty("RECEIVED");
                     String client = message.getStringProperty("CLIENT_ID");
+                    int batchNumber = message.getIntProperty("BATCH");
 
-                    if (debugLog.isInfoEnabled())
+                    if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
                     {
-                        debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived + " DURATION:" + duration);
+                        debugLog.info("Update Report: CLIENT_ID:" + client + " RECEIVED:" + totalReceived
+                                      + " Recevied BATCH:" + batchNumber + " DURATION:" + duration);
                     }
 
-                    recordSlow(client, totalReceived);
-
-                    adjustDelay(client, totalReceived, duration);
+                    recordSlow(client, totalReceived, batchNumber);
 
+                    adjustDelay(client, batchNumber, duration);
 
-                    if (!_warmedup && _totalReceived / _batchSize / delays.size() == _warmUpBatches / 2)
+                    // Warm up completes when:
+                    // we haven't warmed up
+                    // and the number of batches sent to each client is at least half of the required warmup batches
+                    if (!_warmedup
+                        && (batchNumber >= _warmUpBatches))
                     {
                         _warmedup = true;
                         _warmup.countDown();
@@ -478,7 +493,7 @@
 
         CountDownLatch _warmup = new CountDownLatch(1);
 
-        int _warmUpBatches = 20;
+        int _warmUpBatches = Integer.getInteger("warmUpBatches", 10);
 
         int _numBatches = 10000;
 
@@ -527,12 +542,14 @@
                 testMessage = _client.session[0].createTextMessage("start");
 
 
-                for (int batch = 0; batch < batchSize; batch++)
+                for (int batch = 0; batch <= batchSize; batch++)
 //                while (_running)
                 {
                     long start = System.nanoTime();
 
                     testMessage.setText("start");
+                    testMessage.setIntProperty("BATCH", batch);
+
                     _client.producer.send(testMessage);
                     _rateAdapter.sentMessage();
 
@@ -552,9 +569,12 @@
 
                     long sendtime = end - start;
 
-                    debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+                    if (debugLog.isDebugEnabled())
+                    {
+                        debugLog.info("Sent batch[" + batch + "](" + _batchSize + ") in " + sendtime);//timings[batch]);
+                    }
 
-                    if (batch % 10 == 0)
+                    if (batch % LOG_UPATE_INTERVAL == 0)
                     {
                         log.info("Sent Batch[" + batch + "](" + _batchSize + ")" + status());
                     }
@@ -583,23 +603,17 @@
                 return;
             }
 
-            //Slow down if gap between send and received is too large
-            if (_sent - _totalReceived / delays.size() > _messageVariance)
-            {
-                //pause between batches.
-                debugLog.info("Sleeping to keep sent in check with received");
-                log.debug("Increaseing _delay as sending more than receiving");
-                _delay += TEN_MILLI_SEC;
-            }
-
-            //per batch sleep.. if sleep is to small to spread over the batch.
-            if (_delay <= TEN_MILLI_SEC * _batchSize)
-            {
-                sleepLong(_delay);
-            }
-            else
+            if (!SLEEP_PER_MESSAGE)
             {
-                debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+                //per batch sleep.. if sleep is to small to spread over the batch.
+                if (_delay <= TEN_MILLI_SEC * _batchSize)
+                {
+                    sleepLong(_delay);
+                }
+                else
+                {
+                    debugLog.info("Not sleeping _delay > ten*batch is:" + _delay);
+                }
             }
         }
 
@@ -617,10 +631,10 @@
          * Adjust the delay for sending messages based on this update from the client
          *
          * @param client        The client that send this update
-         * @param totalReceived The number of messages that this client has received.
          * @param duration      The time taken for the last batch of messagse
+         * @param batchNumber   The reported batchnumber from the client 
          */
-        private void adjustDelay(String client, long totalReceived, long duration)
+        private void adjustDelay(String client, int batchNumber, long duration)
         {
             //Retrieve the current total time taken for this client.
             Long currentTime = delays.get(client);
@@ -637,23 +651,28 @@
 
             delays.put(client, currentTime);
 
+            long batchesSent = _sent / _batchSize;
+
+            // ensure we don't divide by zero
+            if (batchesSent == 0)
+            {
+                batchesSent = 1L;
+            }
 
             _totalReceived += _batchSize;
             _totalDuration += duration;
 
-            // Calculate the number of messages in the batch.
-            long batchCount = (_totalReceived / _batchSize);
-
             //calculate average duration accross clients per batch
-            long averageDuration = _totalDuration / delays.size() / batchCount;
+            long averageDuration = _totalDuration / delays.size() / batchesSent;
 
             //calculate the difference between current send delay and average report delay
             long diff = (duration) - averageDuration;
 
-            if (debugLog.isInfoEnabled())
+            if (debugLog.isInfoEnabled() && batchNumber % DEBUG_LOG_UPATE_INTERVAL == 0)
             {
-                debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers"
-                              + " on batch: " + batchCount
+                debugLog.info("TotalDuration:" + _totalDuration + " for " + delays.size() + " consumers."
+                              + " on batch: " + batchesSent
+                              + " received batch: " + batchNumber
                               + " Batch Duration: " + duration
                               + " Average: " + averageDuration
                               + " so diff: " + diff + " for : " + client
@@ -696,6 +715,16 @@
                 delayStable();
             }
 
+            // If we have a consumer that is behind with the batches.
+            if (batchesSent - batchNumber > _batchVariance)
+            {
+                debugLog.debug("Increasing _delay as sending more than receiving");
+
+                _delay += 2 * TEN_MILLI_SEC;
+                delayChanged();
+            }
+
+
         }
 
         /** Reset the number of iterations before we say the delay has stabilised. */
@@ -725,10 +754,11 @@
          *
          * @param client   The client identifier to check
          * @param received the number of messages received by that client
+         * @param batchNumber
          */
-        private void recordSlow(String client, long received)
+        private void recordSlow(String client, long received, int batchNumber)
         {
-            if (received < (_sent - _messageVariance))
+            if (Math.abs(batchNumber - (_sent / _batchSize)) > _batchVariance)
             {
                 _slowClients.put(client, received);
             }
@@ -761,6 +791,13 @@
                     }
                 }
             }
+            else
+            {
+                if (SLEEP_PER_MESSAGE && (_delay > 0))
+                {
+                    sleepLong(_delay / _batchSize);
+                }
+            }
         }
 
 
@@ -771,16 +808,38 @@
          */
         private boolean checkForSlowClients()
         {
-            if (_sent % _batchSize == 0)
+            // This will allways be true as we are running this at the end of each batchSize
+//            if (_sent % _batchSize == 0)
             {
                 // Cause test to pause when we have slow
                 if (!_slowClients.isEmpty() || NO_CLIENTS)
                 {
-                    debugLog.info("Pausing for slow clients:" + _slowClients.entrySet().toArray());
+
 
                     while (!_slowClients.isEmpty())
                     {
-                        debugLog.info(_slowClients.size() + " slow clients.");
+                        if (debugLog.isInfoEnabled()
+                            && _sent / _batchSize % DEBUG_LOG_UPATE_INTERVAL == 0)
+                        {
+                            String clients = "";
+                            Iterator it = _slowClients.keySet().iterator();
+                            while (it.hasNext())
+                            {
+                                clients += it.next();
+                                if (it.hasNext())
+                                {
+                                    clients += ", ";
+                                }
+                            }
+                            debugLog.info("Pausing for slow clients:" + clients);
+                        }
+
+
+                        if (log.isDebugEnabled()
+                            && _sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+                        {
+                            log.debug(_slowClients.size() + " slow clients.");
+                        }
                         sleep(PAUSE_SLEEP);
                     }
 
@@ -794,7 +853,11 @@
                 }
                 else
                 {
-                    debugLog.info("Delay:" + _delay);
+                    if (_sent / _batchSize % LOG_UPATE_INTERVAL == 0)
+                    {
+                        log.info("Total Delay :" + _delay + " "
+                                 + (_delayShifting == 0 ? "Stablised" : "Not Stablised(" + _delayShifting + ")"));
+                    }
                 }
 
             }
@@ -825,7 +888,7 @@
          * Perform the sleep , swallowing any InteruptException.
          *
          * NOTE: If a sleep request is > 10s then reset only sleep for 5s
-         *  
+         *
          * @param milli to sleep for
          * @param nano sub miliseconds to sleep for
          */

Modified: incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java (original)
+++ incubator/qpid/trunk/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedTestCoordinator.java Wed Jun 27 08:49:51 2007
@@ -113,6 +113,7 @@
         setPropertiesOnMessage(assignSender, testProperties);
         assignSender.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
         assignSender.setStringProperty("ROLE", "SENDER");
+        assignSender.setStringProperty("CLIENT_NAME", "Sustained_SENDER");
 
         senderConversation.send(senderControlTopic, assignSender);
 
@@ -170,6 +171,7 @@
             setPropertiesOnMessage(assignReceiver, _testProperties);
             assignReceiver.setStringProperty("CONTROL_TYPE", "ASSIGN_ROLE");
             assignReceiver.setStringProperty("ROLE", "RECEIVER");
+            assignReceiver.setStringProperty("CLIENT_NAME", "Sustained_RECEIVER_" + receiver.clientName);
 
             receiverConversation.send(receiverControlTopic, assignReceiver);
 

Propchange: incubator/qpid/trunk/qpid/java/management/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Jun 27 08:49:51 2007
@@ -1,2 +1,4 @@
 target
 *.iml
+org.apache.qpid.management.ui.ipr
+org.apache.qpid.management.ui.iws

Modified: incubator/qpid/trunk/qpid/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/pom.xml?view=diff&rev=551207&r1=551206&r2=551207
==============================================================================
--- incubator/qpid/trunk/qpid/java/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/pom.xml Wed Jun 27 08:49:51 2007
@@ -391,6 +391,29 @@
                     <version>0.5</version>
                 </plugin>
 
+         <plugin>
+               <artifactId>maven-remote-resources-plugin</artifactId>
+               <version>1.0-alpha-5</version>
+               <executions>
+                   <execution>
+                       <goals>
+                           <goal>process</goal>
+                       </goals>
+                       <configuration>
+                           <resourceBundles>
+                               <resourceBundle>org.apache:apache-incubator-disclaimer-resource-bundle:1.1</resourceBundle>
+                               <resourceBundle>org.apache:apache-jar-resource-bundle:1.2</resourceBundle>
+                           </resourceBundles>
+                           <properties>
+                               <addLicense>true</addLicense>
+                               <projectName>Apache Qpid</projectName>
+                           </properties>
+                       </configuration>
+                   </execution>
+               </executions>
+           </plugin>
+
+
             </plugins>
         </pluginManagement>