You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/23 00:25:23 UTC

svn commit: r787427 - in /activemq/sandbox/activemq-flow: activemq-all/src/test/java/org/apache/activemq/legacy/transport/ activemq-broker/ activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/ activemq-openwire/src/test/java/org...

Author: chirino
Date: Mon Jun 22 22:25:22 2009
New Revision: 787427

URL: http://svn.apache.org/viewvc?rev=787427&view=rev
Log:
Moved all the passing tests of the BrokerTest to a package that gets run by the maven build. 

Added:
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java
      - copied, changed from r787420, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java
      - copied, changed from r787419, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java
Removed:
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java
Modified:
    activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java
    activemq/sandbox/activemq-flow/activemq-broker/pom.xml
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java

Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java?rev=787427&r1=787426&r2=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java Mon Jun 22 22:25:22 2009
@@ -21,7 +21,7 @@
 
 import org.apache.activemq.apollo.broker.Broker;
 import org.apache.activemq.apollo.broker.BrokerFactory;
-import org.apache.activemq.legacy.openwireprotocol.BrokerTest;
+import org.apache.activemq.openwire.BrokerTest;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 

Modified: activemq/sandbox/activemq-flow/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/pom.xml?rev=787427&r1=787426&r2=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/pom.xml Mon Jun 22 22:25:22 2009
@@ -226,6 +226,7 @@
         </dependency>
       </dependencies>
     </profile>
+    <!--
     <profile>
       <id>default-tools.jar</id>
       <activation>
@@ -245,6 +246,7 @@
         </dependency>
       </dependencies>
     </profile>
+  -->
   </profiles>
 
 </project>

Added: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=787427&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Mon Jun 22 22:25:22 2009
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.openwireprotocol;
+
+import java.util.ArrayList;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class BrokerTest extends BrokerTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+    public int prefetch;
+    public byte destinationType;
+    public boolean durableConsumer;
+    protected static final int MAX_NULL_WAIT=500;
+    
+    public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+    }
+
+    public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(1);
+        connection1.request(consumerInfo1);
+        
+        ArrayList<Message> msgs = new ArrayList<Message>(4);
+
+        // Send the messages.
+        for (int i = 0; i < 1; i++) {
+            Message message = createMessage(producerInfo, destination, deliveryMode);
+            message.setGroupID("TEST-GROUP");
+            message.setGroupSequence(i + 1);
+            msgs.add(i, message);
+            connection1.request(message);
+        }
+        
+        // Setup a second connection
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        consumerInfo2.setPrefetchSize(1);
+        connection2.request(consumerInfo2);
+        
+        // Send the messages.
+        for (int i = 1; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination, deliveryMode);
+            message.setGroupID("TEST-GROUP");
+            message.setGroupSequence(i + 1);
+            msgs.add(i, message);
+            connection1.request(message);
+        }
+
+        // All the messages should have been sent down connection 1.. just get
+        // the first 3
+        for (int i = 0; i < 3; i++) {
+            Message m1 = receiveMessage(connection1);
+            assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
+            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+        }
+
+        // Close the first consumer.
+        connection1.request(closeConsumerInfo(consumerInfo1));
+
+        // The last messages should now go the the second consumer.
+        for (int i = 3; i < 4; i++) {
+            Message m1 = receiveMessage(connection2);
+            assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
+            connection2.request(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+        }
+
+        assertNoMessagesLeft(connection2);
+    }
+
+    public void initCombosForTestTransactedAckWithPrefetchOfOne() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testTransactedAckWithPrefetchOfOne() throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(1);
+        connection1.send(consumerInfo1);
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo1, destination, deliveryMode);
+            connection1.send(message);
+        }
+
+       
+
+        // Now get the messages.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+            connection1.send(createBeginTransaction(connectionInfo1, txid));
+            Message m1 = receiveMessage(connection1);
+            assertNotNull(m1);
+            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection1.send(ack);
+         // Commit the transaction.
+            connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+        }
+        assertNoMessagesLeft(connection1);
+    }
+
+    public void initCombosForTestTransactedSend() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testTransactedSend() throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+        connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo1, destination, deliveryMode);
+            message.setTransactionId(txid);
+            connection1.request(message);
+        }
+
+        // The point of this test is that message should not be delivered until
+        // send is committed.
+        assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
+
+        // Commit the transaction.
+        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+        // Now get the messages.
+        for (int i = 0; i < 4; i++) {
+            Message m1 = receiveMessage(connection1);
+            assertNotNull(m1);
+        }
+
+        assertNoMessagesLeft(connection1);
+    }
+
+    public void initCombosForTestQueueTransactedAck() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType",
+                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+    }
+
+    public void testQueueTransactedAck() throws Exception {
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        // Send the messages
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo1, destination, deliveryMode);
+            connection1.send(message);
+        }
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+        connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+        // Acknowledge the first 2 messages.
+        for (int i = 0; i < 2; i++) {
+            Message m1 = receiveMessage(connection1);
+            assertNotNull("m1 is null for index: " + i, m1);
+            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection1.request(ack);
+        }
+
+        // Commit the transaction.
+        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+        // The queue should now only have the remaining 2 messages
+        assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
+    }
+    
+    public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        connectionInfo1.setClientId("A");
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        // Send the messages
+        Message m = createMessage(producerInfo1, destination, deliveryMode);
+        connection1.send(m);
+
+        // Create the durable subscription.
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        if (durableConsumer) {
+            consumerInfo1.setSubscriptionName("test");
+        }
+        consumerInfo1.setPrefetchSize(100);
+        consumerInfo1.setRetroactive(true);
+        connection1.send(consumerInfo1);
+
+        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.request(createMessage(producerInfo1, destination, deliveryMode));
+
+        // the behavior is VERY dependent on the recovery policy used.
+        // But the default broker settings try to make it as consistent as
+        // possible
+
+        // Subscription should see all messages sent.
+        Message m2 = receiveMessage(connection1);
+        assertNotNull(m2);
+        assertEquals(m.getMessageId(), m2.getMessageId());
+        for (int i = 0; i < 2; i++) {
+            m2 = receiveMessage(connection1);
+            assertNotNull(m2);
+        }
+
+        assertNoMessagesLeft(connection1);
+    }
+
+    public static Test suite() {
+        return suite(BrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java?rev=787427&r1=787426&r2=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java Mon Jun 22 22:25:22 2009
@@ -68,7 +68,7 @@
     protected int tempDestGenerator;
 
     protected int maxWait = 4000;
-    String PIPE_URI = "pipe://broker";
+    protected String PIPE_URI = "pipe://broker";
 	
     private ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
 
@@ -81,10 +81,14 @@
 
     protected Broker createBroker() throws Exception {
     	Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
-    	broker.addTransportServer(TransportFactory.bind(new URI(PIPE_URI)));
+    	broker.addTransportServer(TransportFactory.bind(new URI(getConnectURI())));
         return broker;
     }
 
+    protected String getBindURI() {
+        return PIPE_URI;
+    }
+
     protected void tearDown() throws Exception {
         for (Iterator<StubConnection> iter = connections.iterator(); iter.hasNext();) {
             StubConnection connection = iter.next();
@@ -303,9 +307,13 @@
     }
 
     protected Transport createTransport() throws URISyntaxException, Exception {
-		return TransportFactory.connect(new URI(PIPE_URI));
+		return TransportFactory.connect(new URI(getConnectURI()));
 	}
 
+    protected String getConnectURI() {
+        return getBindURI();
+    }
+
 	/**
      * @param connection
      * @return

Copied: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java (from r787420, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java?p2=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java&p1=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java&r1=787420&r2=787427&rev=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java Mon Jun 22 22:25:22 2009
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.legacy.openwireprotocol;
+package org.apache.activemq.openwire;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -29,11 +29,12 @@
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.legacy.openwireprotocol.BrokerTestSupport;
+import org.apache.activemq.legacy.openwireprotocol.StubConnection;
 
 public class BrokerTest extends BrokerTestSupport {
 
@@ -498,6 +499,11 @@
         connection.send(closeConnectionInfo(connectionInfo));
     }
 
+    public void initCombosForTestConsumerCloseCausesRedelivery() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+    }
 
     public void testConsumerCloseCausesRedelivery() throws Exception {
 
@@ -604,80 +610,6 @@
         assertNoMessagesLeft(connection2);
     }
 
-    public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-    }
-
-    public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
-
-        ActiveMQDestination destination = new ActiveMQQueue("TEST");
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(1);
-        connection1.request(consumerInfo1);
-        
-        ArrayList<Message> msgs = new ArrayList<Message>(4);
-
-        // Send the messages.
-        for (int i = 0; i < 1; i++) {
-            Message message = createMessage(producerInfo, destination, deliveryMode);
-            message.setGroupID("TEST-GROUP");
-            message.setGroupSequence(i + 1);
-            msgs.add(i, message);
-            connection1.request(message);
-        }
-        
-        // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        connection2.send(connectionInfo2);
-        connection2.send(sessionInfo2);
-
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
-        consumerInfo2.setPrefetchSize(1);
-        connection2.request(consumerInfo2);
-        
-        // Send the messages.
-        for (int i = 1; i < 4; i++) {
-            Message message = createMessage(producerInfo, destination, deliveryMode);
-            message.setGroupID("TEST-GROUP");
-            message.setGroupSequence(i + 1);
-            msgs.add(i, message);
-            connection1.request(message);
-        }
-
-        // All the messages should have been sent down connection 1.. just get
-        // the first 3
-        for (int i = 0; i < 3; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
-            connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
-        }
-
-        // Close the first consumer.
-        connection1.request(closeConsumerInfo(consumerInfo1));
-
-        // The last messages should now go the the second consumer.
-        for (int i = 3; i < 4; i++) {
-            Message m1 = receiveMessage(connection2);
-            assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
-            connection2.request(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
-        }
-
-        assertNoMessagesLeft(connection2);
-    }
-
     public void initCombosForTestTopicConsumerOnlySeeMessagesAfterCreation() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
                                                            Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1171,7 +1103,7 @@
         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
         consumerInfo1.setRetroactive(true);
         consumerInfo1.setPrefetchSize(100);
-        connection1.send(consumerInfo1);
+        connection1.request(consumerInfo1);
 
         // Setup a second connection
         StubConnection connection2 = createConnection();
@@ -1182,7 +1114,7 @@
         consumerInfo2.setPrefetchSize(100);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
-        connection2.send(consumerInfo2);
+        connection2.request(consumerInfo2);
 
         // Send the messages
         connection1.send(createMessage(producerInfo1, destination, deliveryMode));
@@ -1545,215 +1477,6 @@
         connection.request(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
     }
     
-    public void initCombosForTestTransactedAckWithPrefetchOfOne() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
-    }
-
-    public void testTransactedAckWithPrefetchOfOne() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(1);
-        connection1.send(consumerInfo1);
-
-        // Send the messages
-        for (int i = 0; i < 4; i++) {
-            Message message = createMessage(producerInfo1, destination, deliveryMode);
-            connection1.send(message);
-        }
-
-       
-
-        // Now get the messages.
-        for (int i = 0; i < 4; i++) {
-            // Begin the transaction.
-            LocalTransactionId txid = createLocalTransaction(sessionInfo1);
-            connection1.send(createBeginTransaction(connectionInfo1, txid));
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection1.send(ack);
-         // Commit the transaction.
-            connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-        }
-        assertNoMessagesLeft(connection1);
-    }
-
-    public void initCombosForTestTransactedSend() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
-    }
-
-    public void testTransactedSend() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(100);
-        connection1.send(consumerInfo1);
-
-        // Begin the transaction.
-        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
-        connection1.send(createBeginTransaction(connectionInfo1, txid));
-
-        // Send the messages
-        for (int i = 0; i < 4; i++) {
-            Message message = createMessage(producerInfo1, destination, deliveryMode);
-            message.setTransactionId(txid);
-            connection1.request(message);
-        }
-
-        // The point of this test is that message should not be delivered until
-        // send is committed.
-        assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
-
-        // Commit the transaction.
-        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
-        // Now get the messages.
-        for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull(m1);
-        }
-
-        assertNoMessagesLeft(connection1);
-    }
-
-    public void initCombosForTestQueueTransactedAck() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destinationType",
-                             new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
-                                           Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
-    }
-
-    public void testQueueTransactedAck() throws Exception {
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setPrefetchSize(100);
-        connection1.send(consumerInfo1);
-
-        // Send the messages
-        for (int i = 0; i < 4; i++) {
-            Message message = createMessage(producerInfo1, destination, deliveryMode);
-            connection1.send(message);
-        }
-
-        // Begin the transaction.
-        LocalTransactionId txid = createLocalTransaction(sessionInfo1);
-        connection1.send(createBeginTransaction(connectionInfo1, txid));
-
-        // Acknowledge the first 2 messages.
-        for (int i = 0; i < 2; i++) {
-            Message m1 = receiveMessage(connection1);
-            assertNotNull("m1 is null for index: " + i, m1);
-            MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
-            ack.setTransactionId(txid);
-            connection1.request(ack);
-        }
-
-        // Commit the transaction.
-        connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
-        // The queue should now only have the remaining 2 messages
-        assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
-    }
-    
-    public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
-
-        ActiveMQDestination destination = new ActiveMQTopic("TEST");
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        connectionInfo1.setClientId("A");
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        // Send the messages
-        Message m = createMessage(producerInfo1, destination, deliveryMode);
-        connection1.send(m);
-
-        // Create the durable subscription.
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        if (durableConsumer) {
-            consumerInfo1.setSubscriptionName("test");
-        }
-        consumerInfo1.setPrefetchSize(100);
-        consumerInfo1.setRetroactive(true);
-        connection1.send(consumerInfo1);
-
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.request(createMessage(producerInfo1, destination, deliveryMode));
-
-        // the behavior is VERY dependent on the recovery policy used.
-        // But the default broker settings try to make it as consistent as
-        // possible
-
-        // Subscription should see all messages sent.
-        Message m2 = receiveMessage(connection1);
-        assertNotNull(m2);
-        assertEquals(m.getMessageId(), m2.getMessageId());
-        for (int i = 0; i < 2; i++) {
-            m2 = receiveMessage(connection1);
-            assertNotNull(m2);
-        }
-
-        assertNoMessagesLeft(connection1);
-    }
-
-    public void initCombosForTestConsumerCloseCausesRedelivery() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-        addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
-    }
-    
-
     public static Test suite() {
         return suite(BrokerTest.class);
     }

Copied: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java (from r787419, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java?p2=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java&p1=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java&r1=787419&r2=787427&rev=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java Mon Jun 22 22:25:22 2009
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.legacy.openwireprotocol;
+package org.apache.activemq.openwire;
 
 import java.net.URI;
 
 import junit.framework.Test;
 
+import org.apache.activemq.legacy.openwireprotocol.StubConnection;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.wireformat.WireFormat;
@@ -40,11 +41,13 @@
         OpenWireFormat wf2 = new OpenWireFormat();
         wf2.setCacheEnabled(true);
 
-        addCombinationValues("wireFormat", new Object[] {wf1, wf2, });
+        addCombinationValues("wireFormat", new Object[] {wf1, wf2});
     }
 
-    protected StubConnection createConnection() throws Exception {
-        return new StubConnection(TransportFactory.connect(new URI(PIPE_URI+"?marshall=true")));
+    
+    @Override
+    protected String getBindURI() {
+        return PIPE_URI+"?marshal=true";
     }
 
     public static Test suite() {