You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/23 00:25:23 UTC
svn commit: r787427 - in /activemq/sandbox/activemq-flow:
activemq-all/src/test/java/org/apache/activemq/legacy/transport/
activemq-broker/
activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/
activemq-openwire/src/test/java/org...
Author: chirino
Date: Mon Jun 22 22:25:22 2009
New Revision: 787427
URL: http://svn.apache.org/viewvc?rev=787427&view=rev
Log:
Moved all the passing tests of the BrokerTest to a package that gets run by the maven build.
Added:
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java
- copied, changed from r787420, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java
- copied, changed from r787419, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java
Removed:
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java
Modified:
activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java
activemq/sandbox/activemq-flow/activemq-broker/pom.xml
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java
Modified: activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java?rev=787427&r1=787426&r2=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-all/src/test/java/org/apache/activemq/legacy/transport/TransportBrokerTestSupport.java Mon Jun 22 22:25:22 2009
@@ -21,7 +21,7 @@
import org.apache.activemq.apollo.broker.Broker;
import org.apache.activemq.apollo.broker.BrokerFactory;
-import org.apache.activemq.legacy.openwireprotocol.BrokerTest;
+import org.apache.activemq.openwire.BrokerTest;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
Modified: activemq/sandbox/activemq-flow/activemq-broker/pom.xml
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/pom.xml?rev=787427&r1=787426&r2=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/pom.xml (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/pom.xml Mon Jun 22 22:25:22 2009
@@ -226,6 +226,7 @@
</dependency>
</dependencies>
</profile>
+ <!--
<profile>
<id>default-tools.jar</id>
<activation>
@@ -245,6 +246,7 @@
</dependency>
</dependencies>
</profile>
+ -->
</profiles>
</project>
Added: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=787427&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (added)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Mon Jun 22 22:25:22 2009
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.legacy.openwireprotocol;
+
+import java.util.ArrayList;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class BrokerTest extends BrokerTestSupport {
+
+ public ActiveMQDestination destination;
+ public int deliveryMode;
+ public int prefetch;
+ public byte destinationType;
+ public boolean durableConsumer;
+ protected static final int MAX_NULL_WAIT=500;
+
+ public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(1);
+ connection1.request(consumerInfo1);
+
+ ArrayList<Message> msgs = new ArrayList<Message>(4);
+
+ // Send the messages.
+ for (int i = 0; i < 1; i++) {
+ Message message = createMessage(producerInfo, destination, deliveryMode);
+ message.setGroupID("TEST-GROUP");
+ message.setGroupSequence(i + 1);
+ msgs.add(i, message);
+ connection1.request(message);
+ }
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+ consumerInfo2.setPrefetchSize(1);
+ connection2.request(consumerInfo2);
+
+ // Send the messages.
+ for (int i = 1; i < 4; i++) {
+ Message message = createMessage(producerInfo, destination, deliveryMode);
+ message.setGroupID("TEST-GROUP");
+ message.setGroupSequence(i + 1);
+ msgs.add(i, message);
+ connection1.request(message);
+ }
+
+ // All the messages should have been sent down connection 1.. just get
+ // the first 3
+ for (int i = 0; i < 3; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
+ connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ // Close the first consumer.
+ connection1.request(closeConsumerInfo(consumerInfo1));
+
+ // The last messages should now go the the second consumer.
+ for (int i = 3; i < 4; i++) {
+ Message m1 = receiveMessage(connection2);
+ assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
+ connection2.request(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void initCombosForTestTransactedAckWithPrefetchOfOne() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testTransactedAckWithPrefetchOfOne() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(1);
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(message);
+ }
+
+
+
+ // Now get the messages.
+ for (int i = 0; i < 4; i++) {
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection1.send(ack);
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+ }
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void initCombosForTestTransactedSend() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+ }
+
+ public void testTransactedSend() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination, deliveryMode);
+ message.setTransactionId(txid);
+ connection1.request(message);
+ }
+
+ // The point of this test is that message should not be delivered until
+ // send is committed.
+ assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
+
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+ // Now get the messages.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull(m1);
+ }
+
+ assertNoMessagesLeft(connection1);
+ }
+
+ public void initCombosForTestQueueTransactedAck() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destinationType",
+ new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+ Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
+ }
+
+ public void testQueueTransactedAck() throws Exception {
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(100);
+ connection1.send(consumerInfo1);
+
+ // Send the messages
+ for (int i = 0; i < 4; i++) {
+ Message message = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(message);
+ }
+
+ // Begin the transaction.
+ LocalTransactionId txid = createLocalTransaction(sessionInfo1);
+ connection1.send(createBeginTransaction(connectionInfo1, txid));
+
+ // Acknowledge the first 2 messages.
+ for (int i = 0; i < 2; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection1.request(ack);
+ }
+
+ // Commit the transaction.
+ connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
+
+ // The queue should now only have the remaining 2 messages
+ assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
+ }
+
+ public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ connectionInfo1.setClientId("A");
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ // Send the messages
+ Message m = createMessage(producerInfo1, destination, deliveryMode);
+ connection1.send(m);
+
+ // Create the durable subscription.
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ if (durableConsumer) {
+ consumerInfo1.setSubscriptionName("test");
+ }
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setRetroactive(true);
+ connection1.send(consumerInfo1);
+
+ connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+ connection1.request(createMessage(producerInfo1, destination, deliveryMode));
+
+ // the behavior is VERY dependent on the recovery policy used.
+ // But the default broker settings try to make it as consistent as
+ // possible
+
+ // Subscription should see all messages sent.
+ Message m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ assertEquals(m.getMessageId(), m2.getMessageId());
+ for (int i = 0; i < 2; i++) {
+ m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ }
+
+ assertNoMessagesLeft(connection1);
+ }
+
+ public static Test suite() {
+ return suite(BrokerTest.class);
+ }
+
+ public static void main(String[] args) {
+ junit.textui.TestRunner.run(suite());
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java?rev=787427&r1=787426&r2=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTestSupport.java Mon Jun 22 22:25:22 2009
@@ -68,7 +68,7 @@
protected int tempDestGenerator;
protected int maxWait = 4000;
- String PIPE_URI = "pipe://broker";
+ protected String PIPE_URI = "pipe://broker";
private ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
@@ -81,10 +81,14 @@
protected Broker createBroker() throws Exception {
Broker broker = BrokerFactory.createBroker(new URI("jaxb:classpath:non-persistent-activemq.xml"));
- broker.addTransportServer(TransportFactory.bind(new URI(PIPE_URI)));
+ broker.addTransportServer(TransportFactory.bind(new URI(getConnectURI())));
return broker;
}
+ protected String getBindURI() {
+ return PIPE_URI;
+ }
+
protected void tearDown() throws Exception {
for (Iterator<StubConnection> iter = connections.iterator(); iter.hasNext();) {
StubConnection connection = iter.next();
@@ -303,9 +307,13 @@
}
protected Transport createTransport() throws URISyntaxException, Exception {
- return TransportFactory.connect(new URI(PIPE_URI));
+ return TransportFactory.connect(new URI(getConnectURI()));
}
+ protected String getConnectURI() {
+ return getBindURI();
+ }
+
/**
* @param connection
* @return
Copied: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java (from r787420, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java?p2=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java&p1=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java&r1=787420&r2=787427&rev=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/BrokerTest.java Mon Jun 22 22:25:22 2009
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.legacy.openwireprotocol;
+package org.apache.activemq.openwire;
import java.util.ArrayList;
import java.util.List;
@@ -29,11 +29,12 @@
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.legacy.openwireprotocol.BrokerTestSupport;
+import org.apache.activemq.legacy.openwireprotocol.StubConnection;
public class BrokerTest extends BrokerTestSupport {
@@ -498,6 +499,11 @@
connection.send(closeConnectionInfo(connectionInfo));
}
+ public void initCombosForTestConsumerCloseCausesRedelivery() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
+ }
public void testConsumerCloseCausesRedelivery() throws Exception {
@@ -604,80 +610,6 @@
assertNoMessagesLeft(connection2);
}
- public void initCombosForTestGroupedMessagesDeliveredToOnlyOneConsumer() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- }
-
- public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception {
-
- ActiveMQDestination destination = new ActiveMQQueue("TEST");
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(1);
- connection1.request(consumerInfo1);
-
- ArrayList<Message> msgs = new ArrayList<Message>(4);
-
- // Send the messages.
- for (int i = 0; i < 1; i++) {
- Message message = createMessage(producerInfo, destination, deliveryMode);
- message.setGroupID("TEST-GROUP");
- message.setGroupSequence(i + 1);
- msgs.add(i, message);
- connection1.request(message);
- }
-
- // Setup a second connection
- StubConnection connection2 = createConnection();
- ConnectionInfo connectionInfo2 = createConnectionInfo();
- SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
- connection2.send(connectionInfo2);
- connection2.send(sessionInfo2);
-
- ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
- consumerInfo2.setPrefetchSize(1);
- connection2.request(consumerInfo2);
-
- // Send the messages.
- for (int i = 1; i < 4; i++) {
- Message message = createMessage(producerInfo, destination, deliveryMode);
- message.setGroupID("TEST-GROUP");
- message.setGroupSequence(i + 1);
- msgs.add(i, message);
- connection1.request(message);
- }
-
- // All the messages should have been sent down connection 1.. just get
- // the first 3
- for (int i = 0; i < 3; i++) {
- Message m1 = receiveMessage(connection1);
- assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
- connection1.send(createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE));
- }
-
- // Close the first consumer.
- connection1.request(closeConsumerInfo(consumerInfo1));
-
- // The last messages should now go the the second consumer.
- for (int i = 3; i < 4; i++) {
- Message m1 = receiveMessage(connection2);
- assertEquals("Unexpected Message Receipt: " + m1, m1.getMessageId(), msgs.get(i).getMessageId());
- connection2.request(createAck(consumerInfo2, m1, 1, MessageAck.STANDARD_ACK_TYPE));
- }
-
- assertNoMessagesLeft(connection2);
- }
-
public void initCombosForTestTopicConsumerOnlySeeMessagesAfterCreation() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1171,7 +1103,7 @@
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
consumerInfo1.setRetroactive(true);
consumerInfo1.setPrefetchSize(100);
- connection1.send(consumerInfo1);
+ connection1.request(consumerInfo1);
// Setup a second connection
StubConnection connection2 = createConnection();
@@ -1182,7 +1114,7 @@
consumerInfo2.setPrefetchSize(100);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
- connection2.send(consumerInfo2);
+ connection2.request(consumerInfo2);
// Send the messages
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
@@ -1545,215 +1477,6 @@
connection.request(createAck(consumerInfo, m3, 1, MessageAck.DELIVERED_ACK_TYPE));
}
- public void initCombosForTestTransactedAckWithPrefetchOfOne() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType",
- new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
- }
-
- public void testTransactedAckWithPrefetchOfOne() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(1);
- connection1.send(consumerInfo1);
-
- // Send the messages
- for (int i = 0; i < 4; i++) {
- Message message = createMessage(producerInfo1, destination, deliveryMode);
- connection1.send(message);
- }
-
-
-
- // Now get the messages.
- for (int i = 0; i < 4; i++) {
- // Begin the transaction.
- LocalTransactionId txid = createLocalTransaction(sessionInfo1);
- connection1.send(createBeginTransaction(connectionInfo1, txid));
- Message m1 = receiveMessage(connection1);
- assertNotNull(m1);
- MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
- ack.setTransactionId(txid);
- connection1.send(ack);
- // Commit the transaction.
- connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
- }
- assertNoMessagesLeft(connection1);
- }
-
- public void initCombosForTestTransactedSend() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType",
- new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
- }
-
- public void testTransactedSend() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(100);
- connection1.send(consumerInfo1);
-
- // Begin the transaction.
- LocalTransactionId txid = createLocalTransaction(sessionInfo1);
- connection1.send(createBeginTransaction(connectionInfo1, txid));
-
- // Send the messages
- for (int i = 0; i < 4; i++) {
- Message message = createMessage(producerInfo1, destination, deliveryMode);
- message.setTransactionId(txid);
- connection1.request(message);
- }
-
- // The point of this test is that message should not be delivered until
- // send is committed.
- assertNull(receiveMessage(connection1,MAX_NULL_WAIT));
-
- // Commit the transaction.
- connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
- // Now get the messages.
- for (int i = 0; i < 4; i++) {
- Message m1 = receiveMessage(connection1);
- assertNotNull(m1);
- }
-
- assertNoMessagesLeft(connection1);
- }
-
- public void initCombosForTestQueueTransactedAck() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destinationType",
- new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
- Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE)});
- }
-
- public void testQueueTransactedAck() throws Exception {
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- destination = createDestinationInfo(connection1, connectionInfo1, destinationType);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setPrefetchSize(100);
- connection1.send(consumerInfo1);
-
- // Send the messages
- for (int i = 0; i < 4; i++) {
- Message message = createMessage(producerInfo1, destination, deliveryMode);
- connection1.send(message);
- }
-
- // Begin the transaction.
- LocalTransactionId txid = createLocalTransaction(sessionInfo1);
- connection1.send(createBeginTransaction(connectionInfo1, txid));
-
- // Acknowledge the first 2 messages.
- for (int i = 0; i < 2; i++) {
- Message m1 = receiveMessage(connection1);
- assertNotNull("m1 is null for index: " + i, m1);
- MessageAck ack = createAck(consumerInfo1, m1, 1, MessageAck.STANDARD_ACK_TYPE);
- ack.setTransactionId(txid);
- connection1.request(ack);
- }
-
- // Commit the transaction.
- connection1.send(createCommitTransaction1Phase(connectionInfo1, txid));
-
- // The queue should now only have the remaining 2 messages
- assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination));
- }
-
- public void testTopicRetroactiveConsumerSeeMessagesBeforeCreation() throws Exception {
-
- ActiveMQDestination destination = new ActiveMQTopic("TEST");
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- connectionInfo1.setClientId("A");
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- // Send the messages
- Message m = createMessage(producerInfo1, destination, deliveryMode);
- connection1.send(m);
-
- // Create the durable subscription.
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- if (durableConsumer) {
- consumerInfo1.setSubscriptionName("test");
- }
- consumerInfo1.setPrefetchSize(100);
- consumerInfo1.setRetroactive(true);
- connection1.send(consumerInfo1);
-
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.request(createMessage(producerInfo1, destination, deliveryMode));
-
- // the behavior is VERY dependent on the recovery policy used.
- // But the default broker settings try to make it as consistent as
- // possible
-
- // Subscription should see all messages sent.
- Message m2 = receiveMessage(connection1);
- assertNotNull(m2);
- assertEquals(m.getMessageId(), m2.getMessageId());
- for (int i = 0; i < 2; i++) {
- m2 = receiveMessage(connection1);
- assertNotNull(m2);
- }
-
- assertNoMessagesLeft(connection1);
- }
-
- public void initCombosForTestConsumerCloseCausesRedelivery() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destination", new Object[] {new ActiveMQQueue("TEST")});
- }
-
-
public static Test suite() {
return suite(BrokerTest.class);
}
Copied: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java (from r787419, activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java?p2=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java&p1=activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java&r1=787419&r2=787427&rev=787427&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/MarshallingBrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/openwire/MarshallingBrokerTest.java Mon Jun 22 22:25:22 2009
@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.legacy.openwireprotocol;
+package org.apache.activemq.openwire;
import java.net.URI;
import junit.framework.Test;
+import org.apache.activemq.legacy.openwireprotocol.StubConnection;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.wireformat.WireFormat;
@@ -40,11 +41,13 @@
OpenWireFormat wf2 = new OpenWireFormat();
wf2.setCacheEnabled(true);
- addCombinationValues("wireFormat", new Object[] {wf1, wf2, });
+ addCombinationValues("wireFormat", new Object[] {wf1, wf2});
}
- protected StubConnection createConnection() throws Exception {
- return new StubConnection(TransportFactory.connect(new URI(PIPE_URI+"?marshall=true")));
+
+ @Override
+ protected String getBindURI() {
+ return PIPE_URI+"?marshal=true";
}
public static Test suite() {