You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/08/01 16:46:19 UTC
svn commit: r1368034 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
Author: tabish
Date: Wed Aug 1 14:46:18 2012
New Revision: 1368034
URL: http://svn.apache.org/viewvc?rev=1368034&view=rev
Log:
Add to the test case to show there's no problem for: https://issues.apache.org/jira/browse/AMQ-3285
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java?rev=1368034&r1=1368033&r2=1368034&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java Wed Aug 1 14:46:18 2012
@@ -26,7 +26,6 @@ import java.util.concurrent.CopyOnWriteA
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;
@@ -50,9 +49,7 @@ import org.apache.activemq.command.Conne
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.management.JMSConnectionStatsImpl;
-import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.failover.FailoverTransport;
-import org.apache.activemq.transport.stomp.StompTransportFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +79,21 @@ public class ActiveMQXAConnectionFactory
assertTrue(cf.isUseAsyncSend());
// the broker url have been adjusted.
assertEquals("vm:(broker:()/localhost)", cf.getBrokerURL());
+
+ cf = new ActiveMQXAConnectionFactory(
+ "vm://localhost?jms.redeliveryPolicy.maximumRedeliveries=10&" +
+ "jms.redeliveryPolicy.initialRedeliveryDelay=10000&" +
+ "jms.redeliveryPolicy.redeliveryDelay=10000&" +
+ "jms.redeliveryPolicy.useExponentialBackOff=true&" +
+ "jms.redeliveryPolicy.backOffMultiplier=2");
+ assertEquals(10, cf.getRedeliveryPolicy().getMaximumRedeliveries());
+ assertEquals(10000, cf.getRedeliveryPolicy().getInitialRedeliveryDelay());
+ assertEquals(10000, cf.getRedeliveryPolicy().getRedeliveryDelay());
+ assertEquals(true, cf.getRedeliveryPolicy().isUseExponentialBackOff());
+ assertEquals(2.0, cf.getRedeliveryPolicy().getBackOffMultiplier(), 0.1);
+
+ // the broker url have been adjusted.
+ assertEquals("vm://localhost", cf.getBrokerURL());
}
public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException {
@@ -120,34 +132,34 @@ public class ActiveMQXAConnectionFactory
public void testCreateTcpConnectionUsingKnownPort() throws Exception {
assertCreateConnection("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true");
}
-
+
public void testIsSameRM() throws URISyntaxException, JMSException, XAException {
-
+
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
XASession session1 = connection1.createXASession();
XAResource resource1 = session1.getXAResource();
-
+
ActiveMQXAConnectionFactory cf2 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection2 = (XAConnection)cf2.createConnection();
XASession session2 = connection2.createXASession();
XAResource resource2 = session2.getXAResource();
assertTrue(resource1.isSameRM(resource2));
-
+
connection1.close();
connection2.close();
}
public void testVanilaTransactionalProduceReceive() throws Exception {
-
+
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
-
+
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
@@ -158,7 +170,7 @@ public class ActiveMQXAConnectionFactory
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
-
+
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
@@ -170,16 +182,16 @@ public class ActiveMQXAConnectionFactory
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
}
-
+
public void testConsumerCloseTransactionalSendReceive() throws Exception {
-
+
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
-
+
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
@@ -191,7 +203,7 @@ public class ActiveMQXAConnectionFactory
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
session.close();
-
+
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
@@ -203,7 +215,7 @@ public class ActiveMQXAConnectionFactory
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
-
+
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
@@ -212,18 +224,18 @@ public class ActiveMQXAConnectionFactory
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
-
+
}
public void testSessionCloseTransactionalSendReceive() throws Exception {
-
+
ActiveMQXAConnectionFactory cf1 = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false");
XAConnection connection1 = (XAConnection)cf1.createConnection();
connection1.start();
XASession session = connection1.createXASession();
XAResource resource = session.getXAResource();
Destination dest = new ActiveMQQueue(getName());
-
+
// publish a message
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
@@ -234,8 +246,8 @@ public class ActiveMQXAConnectionFactory
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
-
-
+
+
session = connection1.createXASession();
MessageConsumer consumer = session.createConsumer(dest);
tid = createXid();
@@ -247,7 +259,7 @@ public class ActiveMQXAConnectionFactory
assertEquals(getName(), receivedMessage.getText());
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
-
+
session = connection1.createXASession();
consumer = session.createConsumer(dest);
tid = createXid();
@@ -255,7 +267,7 @@ public class ActiveMQXAConnectionFactory
resource.start(tid, XAResource.TMNOFLAGS);
assertNull(consumer.receive(1000));
resource.end(tid, XAResource.TMSUCCESS);
- resource.commit(tid, true);
+ resource.commit(tid, true);
}
@@ -269,36 +281,36 @@ public class ActiveMQXAConnectionFactory
ActiveMQXAConnection xaConnection = (ActiveMQXAConnection)cf1.createConnection();
xaConnection.start();
XASession session = xaConnection.createXASession();
- XAResource resource = session.getXAResource();
+ XAResource resource = session.getXAResource();
Xid tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
resource.commit(tid, true);
-
+
assertTransactionGoneFromBroker(tid);
assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
assertSessionGone(xaConnection, session);
assertTransactionGoneFromFailoverState(xaConnection, tid);
-
+
// two phase
session = xaConnection.createXASession();
- resource = session.getXAResource();
+ resource = session.getXAResource();
tid = createXid();
resource.start(tid, XAResource.TMNOFLAGS);
session.close();
resource.end(tid, XAResource.TMSUCCESS);
assertEquals(XAResource.XA_RDONLY, resource.prepare(tid));
-
- // no need for a commit on read only
+
+ // no need for a commit on read only
assertTransactionGoneFromBroker(tid);
assertTransactionGoneFromConnection(brokerName, xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid);
assertSessionGone(xaConnection, session);
assertTransactionGoneFromFailoverState(xaConnection, tid);
-
+
xaConnection.close();
broker.stop();
-
+
}
public void testCloseSendConnection() throws Exception {
@@ -328,11 +340,11 @@ public class ActiveMQXAConnectionFactory
private void assertTransactionGoneFromFailoverState(
ActiveMQXAConnection connection1, Xid tid) throws Exception {
-
+
FailoverTransport transport = (FailoverTransport) connection1.getTransport().narrow(FailoverTransport.class);
TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE);
- assertNull("transaction shold not exist in the state tracker",
- transport.getStateTracker().processCommitTransactionOnePhase(info));
+ assertNull("transaction shold not exist in the state tracker",
+ transport.getStateTracker().processCommitTransactionOnePhase(info));
}
private void assertSessionGone(ActiveMQXAConnection connection1,
@@ -351,7 +363,7 @@ public class ActiveMQXAConnectionFactory
connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE));
fail("did not get expected excepton on missing transaction, it must be still there in error!");
} catch (IllegalStateException expectedOnNoTransaction) {
- }
+ }
}
}
}
@@ -410,9 +422,9 @@ public class ActiveMQXAConnectionFactory
assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection);
assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection);
}
-
+
public Xid createXid() throws IOException {
-
+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
os.writeLong(++txGenerator);