You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/18 19:31:07 UTC

svn commit: r786177 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/ activemq-openwire/src/main/java/org/apache/activemq/command/...

Author: chirino
Date: Thu Jun 18 17:31:06 2009
New Revision: 786177

URL: http://svn.apache.org/viewvc?rev=786177&view=rev
Log:
Working on making more tests pass in the BrokerTest class.
 * Composite sending works now
 * Temp domains added
 * Creating destination (even temp ones) now supported in the openwire protocol
 * Made the ActiveMQDestination classes implement the apollo Destination interface to avoid extra object allocations

Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Thu Jun 18 17:31:06 2009
@@ -34,6 +34,8 @@
 
     public static final AsciiBuffer TOPIC_DOMAIN = new AsciiBuffer("topic");
     public static final AsciiBuffer QUEUE_DOMAIN = new AsciiBuffer("queue");
+    public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
+    public static final AsciiBuffer TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
 
     private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
     private VirtualHost virtualHost;
@@ -42,6 +44,8 @@
     public Router() {
         domains.put(QUEUE_DOMAIN, new QueueDomain());
         domains.put(TOPIC_DOMAIN, new TopicDomain());
+        domains.put(TEMP_QUEUE_DOMAIN, new QueueDomain());
+        domains.put(TEMP_TOPIC_DOMAIN, new TopicDomain());
     }
 
     public Domain getDomain(AsciiBuffer name) {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenWireMessageDelivery.java Thu Jun 18 17:31:06 2009
@@ -33,7 +33,6 @@
     static final private AsciiBuffer ENCODING = new AsciiBuffer("openwire");
 
     private final Message message;
-    private Destination destination;
     private AsciiBuffer producerId;
     private OpenWireFormat storeWireFormat;
     private PersistListener persistListener = null;
@@ -53,10 +52,7 @@
     }
 
     public Destination getDestination() {
-        if (destination == null) {
-            destination = OpenwireProtocolHandler.convert(message.getDestination());
-        }
-        return destination;
+        return message.getDestination();
     }
 
     public int getMemorySize() {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/broker/openwire/OpenwireProtocolHandler.java Thu Jun 18 17:31:06 2009
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.openwire;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
 
@@ -30,7 +31,6 @@
 import org.apache.activemq.apollo.broker.ProtocolHandler;
 import org.apache.activemq.apollo.broker.Router;
 import org.apache.activemq.apollo.broker.VirtualHost;
-import org.apache.activemq.apollo.broker.BrokerSubscription.UserAlreadyConnectedException;
 import org.apache.activemq.broker.openwire.OpenWireMessageDelivery.PersistListener;
 import org.apache.activemq.broker.store.Store.MessageRecord;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -81,7 +81,6 @@
 import org.apache.activemq.flow.SizeLimiter;
 import org.apache.activemq.flow.ISinkController.FlowControllable;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.state.CommandVisitor;
@@ -100,6 +99,8 @@
     private Router router;
     private VirtualHost host;
     private final CommandVisitor visitor;
+    
+    ArrayList<ActiveMQDestination> temporaryDestinations = new ArrayList<ActiveMQDestination>(); 
 
     public OpenwireProtocolHandler() {
         setStoreWireFormat(new OpenWireFormat());
@@ -159,6 +160,10 @@
             // Message Processing Methods.
             // /////////////////////////////////////////////////////////////////
             public Response processMessage(Message info) throws Exception {
+            	if( info.getOriginalDestination() == null ) {
+            		info.setOriginalDestination(info.getDestination());
+            	}
+            	
                 ProducerId producerId = info.getProducerId();
                 ProducerContext producerContext = producers.get(producerId);
 
@@ -249,7 +254,14 @@
             // Methods for server management
             // /////////////////////////////////////////////////////////////////
             public Response processAddDestination(DestinationInfo info) throws Exception {
-                throw new UnsupportedOperationException();
+            	ActiveMQDestination destination = info.getDestination();
+				if( destination.isTemporary() ) {
+					// Keep track of it so that we can remove them this connection 
+					// shuts down.
+            		temporaryDestinations.add(destination);
+            	}
+            	host.createQueue(destination);
+                return ack(info);
             }
 
             public Response processRemoveDestination(DestinationInfo info) throws Exception {
@@ -577,7 +589,7 @@
          * #getDestination()
          */
         public Destination getDestination() {
-            return convert(info.getDestination());
+            return info.getDestination();
         }
 
         /*
@@ -681,26 +693,6 @@
 
     }
 
-    static public Destination convert(ActiveMQDestination dest) {
-        if (dest.isComposite()) {
-            ActiveMQDestination[] compositeDestinations = dest.getCompositeDestinations();
-            Destination.MultiDestination md = new Destination.MultiDestination();
-            for (int i = 0; i < compositeDestinations.length; i++) {
-                md.add(convert(compositeDestinations[i]));
-            }
-            return md;
-        }
-        AsciiBuffer domain;
-        if (dest.isQueue()) {
-            domain = Router.QUEUE_DOMAIN;
-        } else if (dest.isTopic()) {
-            domain = Router.TOPIC_DOMAIN;
-        } else {
-            throw new IllegalArgumentException("Unsupported domain type: " + dest);
-        }
-        return new Destination.SingleDestination(domain, new AsciiBuffer(dest.getPhysicalName()));
-    }
-
     private static BooleanExpression parseSelector(ConsumerInfo info) throws FilterException {
         BooleanExpression rc = null;
         if (info.getSelector() != null) {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQDestination.java Thu Jun 18 17:31:06 2009
@@ -22,6 +22,8 @@
 import java.io.ObjectOutput;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -35,6 +37,7 @@
 import javax.jms.Topic;
 
 import org.apache.activemq.jndi.JNDIBaseStorable;
+import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
 
@@ -42,7 +45,7 @@
  * @openwire:marshaller
  * @version $Revision: 1.10 $
  */
-public abstract class ActiveMQDestination extends JNDIBaseStorable implements DataStructure, Destination, Externalizable, Comparable {
+public abstract class ActiveMQDestination extends JNDIBaseStorable implements DataStructure, Destination, Externalizable, Comparable, org.apache.activemq.apollo.broker.Destination {
 
     public static final String PATH_SEPERATOR = ".";
     public static final char COMPOSITE_SEPERATOR = ',';
@@ -62,7 +65,7 @@
 
     private static final long serialVersionUID = -3885260014960795889L;
 
-    protected String physicalName;
+    protected AsciiBuffer physicalName;
 
     protected transient ActiveMQDestination[] compositeDestinations;
     protected transient String[] destinationPaths;
@@ -81,6 +84,21 @@
         setCompositeDestinations(composites);
     }
 
+    
+    public AsciiBuffer getName() {
+    	return physicalName;
+    }
+    
+    abstract public AsciiBuffer getDomain();
+    
+	public Collection<org.apache.activemq.apollo.broker.Destination> getDestinations() {
+		if( !isComposite() ) {
+			return null; 
+		}
+		List<org.apache.activemq.apollo.broker.Destination> t = (List)Arrays.asList(compositeDestinations);
+		return t;
+	}
+
 
     // static helper methods for working with destinations
     // -------------------------------------------------------------------------
@@ -185,12 +203,12 @@
                 sb.append(destinations[i].getQualifiedName());
             }
         }
-        physicalName = sb.toString();
+        physicalName = new AsciiBuffer(sb.toString());
     }
 
     public String getQualifiedName() {
         if (isComposite()) {
-            return physicalName;
+            return physicalName.toString();
         }
         return getQualifiedPrefix() + physicalName;
     }
@@ -201,16 +219,16 @@
      * @openwire:property version=1
      */
     public String getPhysicalName() {
-        return physicalName;
+        return physicalName.toString();
     }
 
-    public void setPhysicalName(String physicalName) {
-        final int len = physicalName.length();
+    public void setPhysicalName(String value) {
+        final int len = value.length();
         // options offset
         int p = -1;
         boolean composite = false;
         for (int i = 0; i < len; i++) {
-            char c = physicalName.charAt(i);
+            char c = value.charAt(i);
             if (c == '?') {
                 p = i;
                 break;
@@ -225,21 +243,21 @@
         }
         // Strip off any options
         if (p >= 0) {
-            String optstring = physicalName.substring(p + 1);
-            physicalName = physicalName.substring(0, p);
+            String optstring = value.substring(p + 1);
+            value = value.substring(0, p);
             try {
                 options = URISupport.parseQuery(optstring);
             } catch (URISyntaxException e) {
-                throw new IllegalArgumentException("Invalid destination name: " + physicalName + ", it's options are not encoded properly: " + e);
+                throw new IllegalArgumentException("Invalid destination name: " + value + ", it's options are not encoded properly: " + e);
             }
         }
-        this.physicalName = physicalName;
+        this.physicalName = new AsciiBuffer(value);
         this.destinationPaths = null;
         this.hashValue = 0;
         if (composite) {
             // Check to see if it is a composite.
             List<String> l = new ArrayList<String>();
-            StringTokenizer iter = new StringTokenizer(physicalName, "" + COMPOSITE_SEPERATOR);
+            StringTokenizer iter = new StringTokenizer(value, "" + COMPOSITE_SEPERATOR);
             while (iter.hasMoreTokens()) {
                 String name = iter.nextToken().trim();
                 if (name.length() == 0) {
@@ -268,7 +286,7 @@
         }
 
         List<String> l = new ArrayList<String>();
-        StringTokenizer iter = new StringTokenizer(physicalName, PATH_SEPERATOR);
+        StringTokenizer iter = new StringTokenizer(physicalName.toString(), PATH_SEPERATOR);
         while (iter.hasMoreTokens()) {
             String name = iter.nextToken().trim();
             if (name.length() == 0) {

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQQueue.java Thu Jun 18 17:31:06 2009
@@ -16,9 +16,15 @@
  */
 package org.apache.activemq.command;
 
+import java.util.Collection;
+
 import javax.jms.JMSException;
 import javax.jms.Queue;
 
+import org.apache.activemq.apollo.broker.Destination;
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
 /**
  * 
  * @org.apache.xbean.XBean element="queue" description="An ActiveMQ Queue
@@ -59,4 +65,9 @@
         return QUEUE_QUALIFIED_PREFIX;
     }
 
+	@Override
+	public AsciiBuffer getDomain() {
+		return Router.QUEUE_DOMAIN;
+	}
+
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java Thu Jun 18 17:31:06 2009
@@ -65,9 +65,10 @@
             // Parse off the sequenceId off the end.
             // this can fail if the temp destination is
             // generated by another JMS system via the JMS<->JMS Bridge
-            int p = this.physicalName.lastIndexOf(":");
+        	String value = physicalName.toString();
+            int p = value.lastIndexOf(":");
             if (p >= 0) {
-                String seqStr = this.physicalName.substring(p + 1).trim();
+                String seqStr = value.substring(p + 1).trim();
                 if (seqStr != null && seqStr.length() > 0) {
                     try {
                         sequenceId = Integer.parseInt(seqStr);
@@ -75,7 +76,7 @@
                         LOG.debug("Did not parse sequence Id from " + physicalName);
                     }
                     // The rest should be the connection id.
-                    connectionId = this.physicalName.substring(0, p);
+                    connectionId = value.substring(0, p);
                 }
             }
         }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempQueue.java Thu Jun 18 17:31:06 2009
@@ -19,6 +19,9 @@
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
 
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
 /**
  * @openwire:marshaller code="102"
  * @version $Revision: 1.6 $
@@ -59,4 +62,9 @@
         return TEMP_QUEUE_QUALIFED_PREFIX;
     }
 
+	@Override
+	public AsciiBuffer getDomain() {
+		return Router.TEMP_QUEUE_DOMAIN;
+	}
+
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTempTopic.java Thu Jun 18 17:31:06 2009
@@ -19,6 +19,9 @@
 import javax.jms.JMSException;
 import javax.jms.TemporaryTopic;
 
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
 /**
  * @openwire:marshaller code="103"
  * @version $Revision: 1.6 $
@@ -58,4 +61,9 @@
     protected String getQualifiedPrefix() {
         return TEMP_TOPIC_QUALIFED_PREFIX;
     }
+
+	@Override
+	public AsciiBuffer getDomain() {
+		return Router.TEMP_TOPIC_DOMAIN;
+	}
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/main/java/org/apache/activemq/command/ActiveMQTopic.java Thu Jun 18 17:31:06 2009
@@ -19,6 +19,9 @@
 import javax.jms.JMSException;
 import javax.jms.Topic;
 
+import org.apache.activemq.apollo.broker.Router;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
 /**
  * @org.apache.xbean.XBean element="topic" description="An ActiveMQ Topic
  *                         Destination"
@@ -57,4 +60,9 @@
         return TOPIC_QUALIFIED_PREFIX;
     }
 
+	@Override
+	public AsciiBuffer getDomain() {
+		return Router.TOPIC_DOMAIN;
+	}
+
 }

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Thu Jun 18 17:31:06 2009
@@ -44,6 +44,74 @@
     public boolean durableConsumer;
     protected static final int MAX_NULL_WAIT=500;
 
+    public void initCombosForTestCompositeSend() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                                              Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testCompositeSend() throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA);
+        consumerInfo1.setRetroactive(true);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.request(consumerInfo1);
+
+        // Setup a second connection
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+
+        ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB);
+        consumerInfo2.setRetroactive(true);
+        consumerInfo2.setPrefetchSize(100);
+        connection2.request(consumerInfo2);
+
+        // Send the messages to the composite destination.
+        ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
+                                                                                         destinationType);
+        for (int i = 0; i < 4; i++) {
+            connection1.request(createMessage(producerInfo1, compositeDestination, deliveryMode));
+        }
+
+        // The messages should have been delivered to both the A and B
+        // destination.
+        for (int i = 0; i < 4; i++) {
+            Message m1 = receiveMessage(connection1);
+            Message m2 = receiveMessage(connection2);
+
+            assertNotNull(m1);
+            assertNotNull(m2);
+
+            assertEquals(m1.getMessageId(), m2.getMessageId());
+            assertEquals(compositeDestination, m1.getOriginalDestination());
+            assertEquals(compositeDestination, m2.getOriginalDestination());
+
+            connection1.request(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+            connection2.request(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
+
+        }
+
+        assertNoMessagesLeft(connection1);
+        assertNoMessagesLeft(connection2);
+
+        connection1.send(closeConnectionInfo(connectionInfo1));
+        connection2.send(closeConnectionInfo(connectionInfo2));
+    }
 
     public void initCombosForTestQueueOnlyOnceDeliveryWith2Consumers() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
@@ -331,167 +399,6 @@
         connection.send(closeConnectionInfo(connectionInfo));
     }
 
-    public void initCombosForTestTransactedAckWithPrefetchOfOne() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
-    }
-
-    public void testTransactedAckWithPrefetchOfOne() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(1);
-        connection1.send(consumerInfo1);
-
-        // Send the messages
-        for (int i = 0; i < 4; i++) {
-            Message message = createMessage(producerInfo1, destination, deliveryMode);
-            connection1.send(message);
-        }
-
-       
-
-        // Now get the messages.
-        for (int i = 0; i < 4; i++) {
-            // Begin the transaction.
-            LocalTransactionId txid = createLocalTransaction(sessionInfo1);
-            connection1.send(createBeginTransaction(connectionInfo1, txid));
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection1.send(ack);
-         // Commit the transaction.
-            connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-        }
-        assertNoMessagesLeft(connection1);
-    }
-
-    public void initCombosForTestTransactedSend() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
-    }
-
-    public void testTransactedSend() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(100);
-        connection1.send(consumerInfo1);
-
-        // Begin the transaction.
-        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
-        connection1.send(createBeginTransaction(connectionInfo1, txid));
-
-        // Send the messages
-        for (int i = 0; i < 4; i++) {
-            Message message = createMessage(producerInfo1, destination, deliveryMode);
-            message.setTransactionId(txid);
-            connection1.request(message);
-        }
-
-        // The point of this test is that message should not be delivered until
-        // send is committed.
-        assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
-
-        // Commit the transaction.
-        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
-        // Now get the messages.
-        for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-        }
-
-        assertNoMessagesLeft(connection1);
-    }
-
-    public void initCombosForTestQueueTransactedAck() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
-    }
-
-    public void testQueueTransactedAck() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(100);
-        connection1.send(consumerInfo1);
-
-        // Send the messages
-        for (int i = 0; i < 4; i++) {
-            Message message = createMessage(producerInfo1, destination, deliveryMode);
-            connection1.send(message);
-        }
-
-        // Begin the transaction.
-        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
-        connection1.send(createBeginTransaction(connectionInfo1, txid));
-
-        // Acknowledge the first 2 messages.
-        for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
-            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection1.request(ack);
-        }
-
-        // Commit the transaction.
-        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
-        // The queue should now only have the remaining 2 messages
-        assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
-    }
-
-    public void initCombosForTestConsumerCloseCausesRedelivery() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
-    }
 
     public void testConsumerCloseCausesRedelivery() throws Exception {
 
@@ -711,52 +618,6 @@
         addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE, Boolean.FALSE});
     }
 
-    public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
-
-        ActiveMQDestination destination = new ActiveMQTopic("TEST");
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        connectionInfo1.setClientId("A");
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        // Send the messages
-        Message m = createMessage(producerInfo1, destination, deliveryMode);
-        connection1.send(m);
-
-        // Create the durable subscription.
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        if (durableConsumer) {
-            consumerInfo1.setSubscriptionName("test");
-        }
-        consumerInfo1.setPrefetchSize(100);
-        consumerInfo1.setRetroactive(true);
-        connection1.send(consumerInfo1);
-
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.request(createMessage(producerInfo1, destination, deliveryMode));
-
-        // the behavior is VERY dependent on the recovery policy used.
-        // But the default broker settings try to make it as consistent as
-        // possible
-
-        // Subscription should see all messages sent.
-        Message m2 = receiveMessage(connection1);
-        assertNotNull(m2);
-        assertEquals(m.getMessageId(), m2.getMessageId());
-        for (int i = 0; i < 2; i++) {
-            m2 = receiveMessage(connection1);
-            assertNotNull(m2);
-        }
-
-        assertNoMessagesLeft(connection1);
-    }
-
     //
     // TODO: need to reimplement this since we don't fail when we send to a
     // non-existant
@@ -990,100 +851,32 @@
         connection1.send(producerInfo1);
 
         // setup the composite consumer.
-        ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
-                                                                                         destinationType);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
-        consumerInfo1.setRetroactive(true);
-        consumerInfo1.setPrefetchSize(100);
-        connection1.send(consumerInfo1);
-
-        // Publish to the two destinations
-        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
-        ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
-
-        // Send a message to each destination .
-        connection1.send(createMessage(producerInfo1, destinationA, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destinationB, deliveryMode));
-
-        // The consumer should get both messages.
-        for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-        }
-
-        assertNoMessagesLeft(connection1);
-        connection1.send(closeConnectionInfo(connectionInfo1));
-    }
-
-    public void initCombosForTestCompositeSend() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                                              Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
-    }
-
-    public void testCompositeSend() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destinationA);
+        ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
+                                                                                         destinationType);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, compositeDestination);
         consumerInfo1.setRetroactive(true);
         consumerInfo1.setPrefetchSize(100);
-        connection1.request(consumerInfo1);
-
-        // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        connection2.send(connectionInfo2);
-        connection2.send(sessionInfo2);
+        connection1.send(consumerInfo1);
 
+        // Publish to the two destinations
+        ActiveMQDestination destinationA = ActiveMQDestination.createDestination("A", destinationType);
         ActiveMQDestination destinationB = ActiveMQDestination.createDestination("B", destinationType);
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destinationB);
-        consumerInfo2.setRetroactive(true);
-        consumerInfo2.setPrefetchSize(100);
-        connection2.request(consumerInfo2);
 
-        // Send the messages to the composite destination.
-        ActiveMQDestination compositeDestination = ActiveMQDestination.createDestination("A,B",
-                                                                                         destinationType);
-        for (int i = 0; i < 4; i++) {
-            connection1.request(createMessage(producerInfo1, compositeDestination, deliveryMode));
-        }
+        // Send a message to each destination .
+        connection1.send(createMessage(producerInfo1, destinationA, deliveryMode));
+        connection1.send(createMessage(producerInfo1, destinationB, deliveryMode));
 
-        // The messages should have been delivered to both the A and B
-        // destination.
-        for (int i = 0; i < 4; i++) {
+        // The consumer should get both messages.
+        for (int i = 0; i < 2; i++) {
             Message m1 = receiveMessage(connection1);
-            Message m2 = receiveMessage(connection2);
-
             assertNotNull(m1);
-            assertNotNull(m2);
-
-            assertEquals(m1.getMessageId(), m2.getMessageId());
-            assertEquals(compositeDestination, m1.getOriginalDestination());
-            assertEquals(compositeDestination, m2.getOriginalDestination());
-
-            connection1.request(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
-            connection2.request(createAck(consumerInfo2, m2, 1, MessageAck.STANDARD_ACK_TYPE));
-
         }
 
         assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
-
         connection1.send(closeConnectionInfo(connectionInfo1));
-        connection2.send(closeConnectionInfo(connectionInfo2));
     }
 
+
     public void initCombosForTestConnectionCloseCascades() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
                                                            Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1739,6 +1532,215 @@
         assertNotNull(m3);
         connection.request(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
     }
+    
+    public void initCombosForTestTransactedAckWithPrefetchOfOne() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testTransactedAckWithPrefetchOfOne() throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(1);
+        connection1.send(consumerInfo1);
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo1, destination, deliveryMode);
+            connection1.send(message);
+        }
+
+       
+
+        // Now get the messages.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+            connection1.send(createBeginTransaction(connectionInfo1, txid));
+            Message m1 = receiveMessage(connection1);
+            assertNotNull(m1);
+            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection1.send(ack);
+         // Commit the transaction.
+            connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+        }
+        assertNoMessagesLeft(connection1);
+    }
+
+    public void initCombosForTestTransactedSend() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testTransactedSend() throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+        connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo1, destination, deliveryMode);
+            message.setTransactionId(txid);
+            connection1.request(message);
+        }
+
+        // The point of this test is that message should not be delivered until
+        // send is committed.
+        assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
+
+        // Commit the transaction.
+        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+        // Now get the messages.
+        for (int i = 0; i < 4; i++) {
+            Message m1 = receiveMessage(connection1);
+            assertNotNull(m1);
+        }
+
+        assertNoMessagesLeft(connection1);
+    }
+
+    public void initCombosForTestQueueTransactedAck() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+    }
+
+    public void testQueueTransactedAck() throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo1, destination, deliveryMode);
+            connection1.send(message);
+        }
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+        connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+        // Acknowledge the first 2 messages.
+        for (int i = 0; i < 2; i++) {
+            Message m1 = receiveMessage(connection1);
+            assertNotNull("m1 is null for index: " + i, m1);
+            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection1.request(ack);
+        }
+
+        // Commit the transaction.
+        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+        // The queue should now only have the remaining 2 messages
+        assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
+    }
+    
+    public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        connectionInfo1.setClientId("A");
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        // Send the messages
+        Message m = createMessage(producerInfo1, destination, deliveryMode);
+        connection1.send(m);
+
+        // Create the durable subscription.
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        if (durableConsumer) {
+            consumerInfo1.setSubscriptionName("test");
+        }
+        consumerInfo1.setPrefetchSize(100);
+        consumerInfo1.setRetroactive(true);
+        connection1.send(consumerInfo1);
+
+        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.request(createMessage(producerInfo1, destination, deliveryMode));
+
+        // the behavior is VERY dependent on the recovery policy used.
+        // But the default broker settings try to make it as consistent as
+        // possible
+
+        // Subscription should see all messages sent.
+        Message m2 = receiveMessage(connection1);
+        assertNotNull(m2);
+        assertEquals(m.getMessageId(), m2.getMessageId());
+        for (int i = 0; i < 2; i++) {
+            m2 = receiveMessage(connection1);
+            assertNotNull(m2);
+        }
+
+        assertNoMessagesLeft(connection1);
+    }
+
+    public void initCombosForTestConsumerCloseCausesRedelivery() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+    }
+    
 
     public static Test suite() {
         return suite(BrokerTest.class);

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java?rev=786177&r1=786176&r2=786177&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java (original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/protobuf/AsciiBuffer.java Thu Jun 18 17:31:06 2009
@@ -4,7 +4,8 @@
 final public class AsciiBuffer extends Buffer {
 
     private int hashCode;
-
+    private String value;
+    
     public AsciiBuffer(Buffer other) {
         super(other);
     }
@@ -17,8 +18,9 @@
         super(data);
     }
 
-    public AsciiBuffer(String input) {
-        super(encode(input));
+    public AsciiBuffer(String value) {
+        super(encode(value));
+        this.value = value;
     }
 
     public AsciiBuffer compact() {
@@ -30,7 +32,10 @@
 
     public String toString()
     {
-        return decode(this);
+    	if( value == null ) {
+    		value = decode(this); 
+    	}
+        return value; 
     }
 
     @Override