You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/04/14 13:25:31 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6240 -
tidy up test and reduce duration. reuse closeTimeout on rollback during close
which is the case here
Repository: activemq
Updated Branches:
refs/heads/master 15dc6cc76 -> 530c1a819
https://issues.apache.org/jira/browse/AMQ-6240 - tidy up test and reduce duration. reuse closeTimeout on rollback during close which is the case here
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/530c1a81
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/530c1a81
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/530c1a81
Branch: refs/heads/master
Commit: 530c1a81934d0a1ca51a8778e59ee19509378d87
Parents: 15dc6cc
Author: gtully <ga...@gmail.com>
Authored: Thu Apr 14 12:25:11 2016 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Apr 14 12:25:11 2016 +0100
----------------------------------------------------------------------
.../org/apache/activemq/TransactionContext.java | 2 +-
.../org/apache/activemq/bugs/AMQ6240Test.java | 47 ++++++++++++--------
2 files changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/530c1a81/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index d59604a..efe12c4 100755
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -277,7 +277,7 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
this.transactionId = null;
//make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
- this.connection.doSyncSendPacket(info, this.connection.getSendTimeout() > 0 && this.connection.isClosing() ? this.connection.getSendTimeout() : 0);
+ this.connection.doSyncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
// Notify the listener that the tx was rolled back
if (localTransactionEventListener != null) {
localTransactionEventListener.rollbackEvent();
http://git-wip-us.apache.org/repos/asf/activemq/blob/530c1a81/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6240Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6240Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6240Test.java
index fadb350..f13336c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6240Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6240Test.java
@@ -17,76 +17,85 @@
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.*;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import java.util.concurrent.atomic.AtomicInteger;
-public class AMQ6240Test extends JmsTimeoutTest {
+public class AMQ6240Test extends EmbeddedBrokerTestSupport {
static final Logger LOG = LoggerFactory.getLogger(AMQ6240Test.class);
-
- public boolean isPersistent() { return true;}
-
public void testBlockedTxProducerConnectionTimeoutConnectionCanClose() throws Exception {
final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
final ActiveMQDestination queue = createDestination("noPfc");
- // we should not take longer than 10 seconds to return from send
- cx.setSendTimeout(10000);
+ cx.setSendTimeout(4000);
+ cx.setCloseTimeout(1000);
+ final AtomicInteger exceptionCount = new AtomicInteger(0);
Runnable r = new Runnable() {
public void run() {
+ int count=0;
try {
LOG.info("Sender thread starting");
Session session = cx.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);
- producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- TextMessage message = session.createTextMessage(createMessageText());
- for(int count=0; count<messageCount; count++){
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(new byte[8*1024]);
+ for(; count<100; count++){
producer.send(message);
}
LOG.info("Done sending..");
} catch (JMSException e) {
if (e.getCause() instanceof RequestTimedOutIOException) {
exceptionCount.incrementAndGet();
+ LOG.info("Got expected send time out on message: " + count);
} else {
e.printStackTrace();
}
return;
}
-
}
};
cx.start();
Thread producerThread = new Thread(r);
producerThread.start();
- producerThread.join(15000);
+ producerThread.join(7000);
cx.close();
- // We should have a few timeout exceptions as memory store will fill up
+ // We should have a few timeout exceptions as store will fill up
assertTrue("No exception from the broker", exceptionCount.get() > 0);
}
-
- protected void setUp() throws Exception {
- super.setUp();
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ answer.getManagementContext().setCreateConnector(false);
+ answer.addConnector(bindAddress);
PolicyMap policyMap = new PolicyMap();
PolicyEntry noProducerFlowControl = new PolicyEntry();
noProducerFlowControl.setProducerFlowControl(false);
policyMap.put(new ActiveMQQueue("noPfc"), noProducerFlowControl);
- broker.setDestinationPolicy(policyMap);
- broker.getSystemUsage().getStoreUsage().setLimit(50*1024*1024);
+ answer.setDestinationPolicy(policyMap);
+ KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter();
+ kahaDBPersistenceAdapter.setJournalMaxFileLength(16*1024);
+ answer.getSystemUsage().getStoreUsage().setLimit(34*1024);
+ answer.setDeleteAllMessagesOnStartup(true);
+ return answer;
}
}