You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/01/07 11:01:25 UTC
svn commit: r1429726 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
broker/src/main/java/org/apache/qpid/server/queue/
broker/src/main/java/org/apache/qpid/server/tran...
Author: kwall
Date: Mon Jan 7 10:01:25 2013
New Revision: 1429726
URL: http://svn.apache.org/viewvc?rev=1429726&view=rev
Log:
QPID-3569: Refactor TransactionTimeout
* Moved the duplicated transactionUpdateTime member from AMQChannel/ServerSession to ServerTransaction.
** LocalTransaction now maintains advances transactionUpdateTime on each enqueue/dequeue operation
** Other non-transactional ServerTransaction impls return transactionUpdateTime of 0 (as they already do for transactionStartTime).
** Changed LocalTransaction so that transaction start time is recorded on first enqueue or dequeue operation (rather than only
first enqueue)
* Moved duplicated logic from AMQChannel/ServerSession#checkTransactionStatus to TransactionTimeoutHelper
* Make TransactionTimeoutTests use a durable queue so it is actually testing with store transactions.
* Removed warnings if operational logging is turned off.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jan 7 10:01:25 2013
@@ -51,6 +51,7 @@ import org.apache.qpid.framing.MethodReg
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.Exchange;
@@ -87,6 +88,7 @@ import org.apache.qpid.server.subscripti
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
@@ -145,7 +147,6 @@ public class AMQChannel implements AMQSe
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
@@ -185,15 +186,29 @@ public class AMQChannel implements AMQSe
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
- _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+ _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
- _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+ {
+ @Override
+ public void doTimeoutAction(String reason) throws AMQException
+ {
+ closeConnection(reason);
+ }
+ });
}
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _transaction = new LocalTransaction(_messageStore);
+ _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor()
+ {
+ @Override
+ public long getActivityTime()
+ {
+ return _session.getLastReceivedTime();
+ }
+ });
_txnStarts.incrementAndGet();
}
@@ -207,8 +222,6 @@ public class AMQChannel implements AMQSe
sync();
}
-
-
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -229,11 +242,6 @@ public class AMQChannel implements AMQSe
}
}
- public Long getTxnStarts()
- {
- return _txnStarts.get();
- }
-
public Long getTxnCommits()
{
return _txnCommits.get();
@@ -351,9 +359,8 @@ public class AMQChannel implements AMQSe
}
});
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
_currentMessage.getStoredMessage().flushToStore();
}
}
@@ -839,7 +846,6 @@ public class AMQChannel implements AMQSe
{
Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
- updateTransactionalActivity();
}
private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
@@ -959,7 +965,6 @@ public class AMQChannel implements AMQSe
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
- _txnUpdateTime.set(0);
}
});
}
@@ -969,7 +974,6 @@ public class AMQChannel implements AMQSe
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
- _txnUpdateTime.set(0);
decrementOutstandingTxnsIfNecessary();
}
}
@@ -1007,7 +1011,6 @@ public class AMQChannel implements AMQSe
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
- _txnUpdateTime.set(0);
decrementOutstandingTxnsIfNecessary();
}
@@ -1036,19 +1039,6 @@ public class AMQChannel implements AMQSe
}
}
-
-
- }
-
- /**
- * Update last transaction activity timestamp
- */
- private void updateTransactionalActivity()
- {
- if (isTransactional())
- {
- _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
- }
}
public String toString()
@@ -1215,11 +1205,6 @@ public class AMQChannel implements AMQSe
// TODO
throw new RuntimeException(e);
}
-
-
-
-
-
}
public void onRollback()
@@ -1369,7 +1354,6 @@ public class AMQChannel implements AMQSe
public void onRollback()
{
- //To change body of implemented methods use File | Settings | File Templates.
}
}
@@ -1478,37 +1462,9 @@ public class AMQChannel implements AMQSe
return _createTime;
}
- public void mgmtClose() throws AMQException
- {
- _session.mgmtCloseChannel(_channelId);
- }
-
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- final long transactionStartTime = _transaction.getTransactionStartTime();
- final long transactionUpdateTime = _txnUpdateTime.get();
- if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
- {
- long currentTime = System.currentTimeMillis();
- long openTime = currentTime - transactionStartTime;
- long idleTime = currentTime - transactionUpdateTime;
-
- _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
- TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
- {
- closeConnection("Idle transaction timed out");
- return;
- }
-
- _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
- TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
- {
- closeConnection("Open transaction timed out");
- return;
- }
- }
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
/**
@@ -1628,14 +1584,8 @@ public class AMQChannel implements AMQSe
_action.postCommit();
_action = null;
}
-
- boolean isReadyForCompletion()
- {
- return _future.isComplete();
- }
}
-
@Override
public int getConsumerCount()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java Mon Jan 7 10:01:25 2013
@@ -18,46 +18,85 @@
*/
package org.apache.qpid.server;
-import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.txn.ServerTransaction;
public class TransactionTimeoutHelper
{
- private static final Logger LOGGER = Logger.getLogger(TransactionTimeoutHelper.class);
-
- public static final String IDLE_TRANSACTION_ALERT = "IDLE TRANSACTION ALERT";
- public static final String OPEN_TRANSACTION_ALERT = "OPEN TRANSACTION ALERT";
+ private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
+ private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
private final LogSubject _logSubject;
- public TransactionTimeoutHelper(final LogSubject logSubject)
+ private final CloseAction _closeAction;
+
+ public TransactionTimeoutHelper(final LogSubject logSubject, final CloseAction closeAction)
{
_logSubject = logSubject;
+ _closeAction = closeAction;
}
- public void logIfNecessary(final long timeSoFar, final long warnTimeout,
- final LogMessage message, final String alternateLogPrefix)
+ public void checkIdleOrOpenTimes(ServerTransaction transaction, long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (isTimedOut(timeSoFar, warnTimeout))
+ if (transaction.isTransactional())
{
- LogActor logActor = CurrentActor.get();
- if(logActor.getRootMessageLogger().isMessageEnabled(logActor, _logSubject, message.getLogHierarchy()))
+ final long transactionUpdateTime = transaction.getTransactionUpdateTime();
+ if(transactionUpdateTime > 0)
{
- logActor.message(_logSubject, message);
+ long idleTime = System.currentTimeMillis() - transactionUpdateTime;
+ boolean closed = logAndCloseIfNecessary(idleTime, idleWarn, idleClose, ChannelMessages.IDLE_TXN(idleTime), IDLE_TRANSACTION_TIMEOUT_ERROR);
+ if (closed)
+ {
+ return; // no point proceeding to check the open time
+ }
}
- else
+
+ final long transactionStartTime = transaction.getTransactionStartTime();
+ if(transactionStartTime > 0)
{
- LOGGER.warn(alternateLogPrefix + " " + _logSubject.toLogString() + " " + timeSoFar + " ms");
+ long openTime = System.currentTimeMillis() - transactionStartTime;
+ logAndCloseIfNecessary(openTime, openWarn, openClose, ChannelMessages.OPEN_TXN(openTime), OPEN_TRANSACTION_TIMEOUT_ERROR);
}
}
}
- public boolean isTimedOut(long timeSoFar, long timeout)
+ /**
+ * @return true iff closeTimeout was exceeded
+ */
+ private boolean logAndCloseIfNecessary(final long timeSoFar,
+ final long warnTimeout, final long closeTimeout,
+ final LogMessage warnMessage, final String closeMessage) throws AMQException
+ {
+ if (isTimedOut(timeSoFar, warnTimeout))
+ {
+ LogActor logActor = CurrentActor.get();
+ logActor.message(_logSubject, warnMessage);
+ }
+
+ if(isTimedOut(timeSoFar, closeTimeout))
+ {
+ _closeAction.doTimeoutAction(closeMessage);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private boolean isTimedOut(long timeSoFar, long timeout)
{
return timeout > 0L && timeSoFar > timeout;
}
+
+ public interface CloseAction
+ {
+ void doTimeoutAction(String reason) throws AMQException;
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Mon Jan 7 10:01:25 2013
@@ -80,7 +80,7 @@ public class ExchangeDestination impleme
{
// NO-OP
}
- }, System.currentTimeMillis());
+ });
return ACCEPTED;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jan 7 10:01:25 2013
@@ -454,7 +454,7 @@ public abstract class QueueEntryImpl imp
{
}
- }, 0L);
+ });
txn.dequeue(currentQueue, message, new ServerTransaction.Action()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Jan 7 10:01:25 2013
@@ -1398,7 +1398,7 @@ public class SimpleAMQQueue implements A
{
}
- }, 0L);
+ });
txn.dequeue(this, entry.getMessage(),
new ServerTransaction.Action()
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Jan 7 10:01:25 2013
@@ -43,6 +43,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -132,7 +133,6 @@ public class ServerSession extends Sessi
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
@@ -147,7 +147,14 @@ public class ServerSession extends Sessi
_transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
_logSubject = new ChannelLogSubject(this);
- _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+ {
+ @Override
+ public void doTimeoutAction(String reason) throws AMQException
+ {
+ getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+ }
+ });
}
protected void setState(State state)
@@ -186,9 +193,8 @@ public class ServerSession extends Sessi
}
getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
- _transaction.enqueue(queues,message, postTransactionAction, 0L);
+ _transaction.enqueue(queues,message, postTransactionAction);
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
}
@@ -402,7 +408,6 @@ public class ServerSession extends Sessi
entry.release();
}
});
- updateTransactionalActivity();
}
public Collection<Subscription_0_10> getSubscriptions()
@@ -549,7 +554,6 @@ public class ServerSession extends Sessi
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
- _txnUpdateTime.set(0);
decrementOutstandingTxnsIfNecessary();
}
@@ -559,7 +563,6 @@ public class ServerSession extends Sessi
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
- _txnUpdateTime.set(0);
decrementOutstandingTxnsIfNecessary();
}
@@ -584,22 +587,6 @@ public class ServerSession extends Sessi
}
}
- /**
- * Update last transaction activity timestamp
- */
- private void updateTransactionalActivity()
- {
- if (isTransactional())
- {
- _txnUpdateTime.set(System.currentTimeMillis());
- }
- }
-
- public Long getTxnStarts()
- {
- return _txnStarts.get();
- }
-
public Long getTxnCommits()
{
return _txnCommits.get();
@@ -705,30 +692,7 @@ public class ServerSession extends Sessi
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- final long transactionStartTime = _transaction.getTransactionStartTime();
- final long transactionUpdateTime = _txnUpdateTime.get();
- if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
- {
- long currentTime = System.currentTimeMillis();
- long openTime = currentTime - transactionStartTime;
- long idleTime = currentTime - transactionUpdateTime;
-
- _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
- TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
- return;
- }
-
- _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
- TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
- return;
- }
- }
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
public void block(AMQQueue queue)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Mon Jan 7 10:01:25 2013
@@ -66,11 +66,18 @@ public class AsyncAutoCommitTransaction
_futureRecorder = recorder;
}
+ @Override
public long getTransactionStartTime()
{
return 0L;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
@@ -241,7 +248,7 @@ public class AsyncAutoCommitTransaction
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Mon Jan 7 10:01:25 2013
@@ -52,11 +52,18 @@ public class AutoCommitTransaction imple
_messageStore = transactionLog;
}
+ @Override
public long getTransactionStartTime()
{
return 0L;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0L;
+ }
+
/**
* Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
* by the caller are executed immediately.
@@ -178,7 +185,7 @@ public class AutoCommitTransaction imple
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
Transaction txn = null;
try
@@ -270,4 +277,6 @@ public class AutoCommitTransaction imple
}
}
+
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Mon Jan 7 10:01:25 2013
@@ -26,7 +26,6 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Xid;
@@ -39,10 +38,6 @@ public class DistributedTransaction impl
private final AutoCommitTransaction _autoCommitTransaction;
- private volatile Transaction _transaction;
-
- private long _txnStartTime = 0L;
-
private DtxBranch _branch;
private AMQSessionModel _session;
private VirtualHost _vhost;
@@ -55,9 +50,16 @@ public class DistributedTransaction impl
_autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore());
}
+ @Override
public long getTransactionStartTime()
{
- return _txnStartTime;
+ return 0;
+ }
+
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return 0;
}
public void addPostTransactionAction(Action postTransactionAction)
@@ -107,7 +109,7 @@ public class DistributedTransaction impl
{
_branch.enqueue(queue, message);
_branch.addPostTransactionAcion(postTransactionAction);
- enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis());
+ enqueue(Collections.singletonList(queue), message, postTransactionAction);
}
else
{
@@ -116,7 +118,7 @@ public class DistributedTransaction impl
}
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message,
- Action postTransactionAction, long currentTime)
+ Action postTransactionAction)
{
if(_branch != null)
{
@@ -128,7 +130,7 @@ public class DistributedTransaction impl
}
else
{
- _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime);
+ _autoCommitTransaction.enqueue(queues, message, postTransactionAction);
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Mon Jan 7 10:01:25 2013
@@ -49,25 +49,42 @@ public class LocalTransaction implements
private final List<Action> _postTransactionActions = new ArrayList<Action>();
private volatile Transaction _transaction;
- private MessageStore _transactionLog;
+ private final ActivityTimeAccessor _activityTime;
+ private final MessageStore _transactionLog;
private volatile long _txnStartTime = 0L;
+ private volatile long _txnUpdateTime = 0l;
private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
- _transactionLog = transactionLog;
+ this(transactionLog, new ActivityTimeAccessor()
+ {
+ @Override
+ public long getActivityTime()
+ {
+ return System.currentTimeMillis();
+ }
+ });
}
-
- public boolean inTransaction()
+
+ public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
{
- return _transaction != null;
+ _transactionLog = transactionLog;
+ _activityTime = activityTime;
}
+ @Override
public long getTransactionStartTime()
{
return _txnStartTime;
}
+ @Override
+ public long getTransactionUpdateTime()
+ {
+ return _txnUpdateTime;
+ }
+
public void addPostTransactionAction(Action postTransactionAction)
{
sync();
@@ -78,6 +95,7 @@ public class LocalTransaction implements
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -104,6 +122,7 @@ public class LocalTransaction implements
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
try
{
@@ -180,6 +199,7 @@ public class LocalTransaction implements
{
sync();
_postTransactionActions.add(postTransactionAction);
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent() && queue.isDurable())
{
@@ -189,7 +209,7 @@ public class LocalTransaction implements
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
}
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -202,15 +222,11 @@ public class LocalTransaction implements
}
}
- public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+ public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
{
sync();
_postTransactionActions.add(postTransactionAction);
-
- if (_txnStartTime == 0L)
- {
- _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
- }
+ initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
if(message.isPersistent())
{
@@ -224,8 +240,7 @@ public class LocalTransaction implements
{
_logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
}
-
-
+
beginTranIfNecessary();
_transaction.enqueueMessage(queue, message);
}
@@ -378,8 +393,6 @@ public class LocalTransaction implements
}
throw new RuntimeException("Failed to commit transaction", e);
}
-
-
}
private void doPostTransactionActions()
@@ -437,16 +450,34 @@ public class LocalTransaction implements
}
}
+ private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime()
+ {
+ long currentTime = _activityTime.getActivityTime();
+
+ if (_txnStartTime == 0)
+ {
+ _txnStartTime = currentTime;
+ }
+ _txnUpdateTime = currentTime;
+ }
+
private void resetDetails()
{
_asyncTran = null;
_transaction = null;
- _postTransactionActions.clear();
+ _postTransactionActions.clear();
_txnStartTime = 0L;
+ _txnUpdateTime = 0;
}
public boolean isTransactional()
{
return true;
}
+
+ public interface ActivityTimeAccessor
+ {
+ long getActivityTime();
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Mon Jan 7 10:01:25 2013
@@ -55,11 +55,18 @@ public interface ServerTransaction
/**
* Return the time the current transaction started.
- *
+ *
* @return the time this transaction started or 0 if not in a transaction
*/
long getTransactionStartTime();
+ /**
+ * Return the time of the last activity on the current transaction.
+ *
+ * @return the time of the last activity or 0 if not in a transaction
+ */
+ long getTransactionUpdateTime();
+
/**
* Register an Action for execution after transaction commit or rollback. Actions
* will be executed in the order in which they are registered.
@@ -92,7 +99,7 @@ public interface ServerTransaction
*
* Store operations will result only for a persistent messages on durable queues.
*/
- void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
+ void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
/**
* Commit the transaction represented by this object.
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java Mon Jan 7 10:01:25 2013
@@ -18,67 +18,131 @@
*/
package org.apache.qpid.server;
-import static org.mockito.Matchers.any;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.qpid.server.logging.messages.ChannelMessages.IDLE_TXN_LOG_HIERARCHY;
+import static org.apache.qpid.server.logging.messages.ChannelMessages.OPEN_TXN_LOG_HIERARCHY;
+import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.test.utils.QpidTestCase;
+import org.hamcrest.Description;
+import org.mockito.ArgumentMatcher;
public class TransactionTimeoutHelperTest extends QpidTestCase
{
- private final LogMessage _logMessage = mock(LogMessage.class);
private final LogActor _logActor = mock(LogActor.class);
private final LogSubject _logSubject = mock(LogSubject.class);
+ private final ServerTransaction _transaction = mock(ServerTransaction.class);
+ private final CloseAction _closeAction = mock(CloseAction.class);
private TransactionTimeoutHelper _transactionTimeoutHelper;
- private RootMessageLogger _rootMessageLogger;
+ private long _now;
- public void testLogIfNecessary()
+ public void testNotTransactional() throws Exception
{
- _transactionTimeoutHelper.logIfNecessary(99, 100, _logMessage, "");
- verifyZeroInteractions(_logActor, _logMessage);
+ when(_transaction.isTransactional()).thenReturn(false);
- _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, "");
- verify(_logActor).message(_logSubject, _logMessage);
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 5, 10, 5, 10);
+
+ verifyZeroInteractions(_logActor, _closeAction);
+ }
+
+ public void testOpenTransactionProducesWarningOnly() throws Exception
+ {
+ final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+
+ configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo);
+
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(30), 0, 0, 0);
+
+ verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+ verifyZeroInteractions(_closeAction);
+ }
+
+ public void testOpenTransactionProducesTimeoutActionOnly() throws Exception
+ {
+ final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+
+ configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo);
+
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, SECONDS.toMillis(30), 0, 0);
+
+ verify(_closeAction).doTimeoutAction("Open transaction timed out");
+ verifyZeroInteractions(_logActor);
+ }
+
+ public void testOpenTransactionProducesWarningAndTimeoutAction() throws Exception
+ {
+ final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+
+ configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo);
+
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(15), SECONDS.toMillis(30), 0, 0);
+
+ verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+ verify(_closeAction).doTimeoutAction("Open transaction timed out");
+ }
+
+ public void testIdleTransactionProducesWarningOnly() throws Exception
+ {
+ final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+ final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+ configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo);
+
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(30), 0);
+
+ verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+ verifyZeroInteractions(_closeAction);
}
- public void testLogIfNecessaryWhenOperationalLoggingDisabled()
+ public void testIdleTransactionProducesTimeoutActionOnly() throws Exception
{
- //disable the operational logging
- when(_rootMessageLogger.isMessageEnabled(
- same(_logActor), any(LogSubject.class), any(String.class)))
- .thenReturn(false);
+ final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+ final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+ configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo);
+
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, 0, SECONDS.toMillis(30));
- //verify the actor is never asked to log a message
- _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, "");
- verify(_logActor, never()).message(any(LogMessage.class));
- verify(_logActor, never()).message(any(LogSubject.class), any(LogMessage.class));
+ verify(_closeAction).doTimeoutAction("Idle transaction timed out");
+ verifyZeroInteractions(_logActor);
}
- public void testIsTimedOut()
+ public void testIdleTransactionProducesWarningAndTimeoutAction() throws Exception
{
- assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(199,200));
- assertTrue("Should have timed out", _transactionTimeoutHelper.isTimedOut(201,200));
+ final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+ final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+ configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo);
+
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(15), SECONDS.toMillis(30));
+
+ verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+ verify(_closeAction).doTimeoutAction("Idle transaction timed out");
}
- /**
- * If TransactionTimeout is disabled, the timeout will be 0. This test verifies
- * that the helper methods respond negatively in this scenario.
- */
- public void testTransactionTimeoutDisabled()
+ public void testIdleAndOpenWarnings() throws Exception
{
- assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(201,0));
+ final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+ final long thirtyOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+ configureMockTransaction(sixtyOneSecondsAgo, thirtyOneSecondsAgo);
+
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(60), 0, SECONDS.toMillis(30), 0);
- _transactionTimeoutHelper.logIfNecessary(99, 0, _logMessage, "");
- verifyZeroInteractions(_logActor, _logMessage);
+ verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+ verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+ verifyZeroInteractions(_closeAction);
}
@Override
@@ -88,14 +152,79 @@ public class TransactionTimeoutHelperTes
CurrentActor.set(_logActor);
- _rootMessageLogger = mock(RootMessageLogger.class);
- when(_logActor.getRootMessageLogger()).thenReturn(_rootMessageLogger);
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, _closeAction);
+ _now = System.currentTimeMillis();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ CurrentActor.remove();
+ }
+ }
- when(_rootMessageLogger.isMessageEnabled(
- same(_logActor), any(LogSubject.class), any(String.class)))
- .thenReturn(true);
+ private void configureMockTransaction(final long startTime, final long updateTime)
+ {
+ when(_transaction.isTransactional()).thenReturn(true);
+ when(_transaction.getTransactionStartTime()).thenReturn(startTime);
+ when(_transaction.getTransactionUpdateTime()).thenReturn(updateTime);
+ }
- _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+ private LogMessage isLogMessage(String expectedlogHierarchy, String expectedText)
+ {
+ return argThat(new IsLogMessage(expectedlogHierarchy, expectedText));
}
+ class IsLogMessage extends ArgumentMatcher<LogMessage>
+ {
+ private final String _expectedLogHierarchy;
+ private final String _expectedLogMessageMatches;
+ private String _hierarchyMatchesFailure;
+ private String _logMessageMatchesFailure;
+
+ public IsLogMessage(String expectedlogHierarchy, String expectedLogMessageMatches)
+ {
+ _expectedLogHierarchy = expectedlogHierarchy;
+ _expectedLogMessageMatches = expectedLogMessageMatches;
+ }
+
+ public boolean matches(Object arg)
+ {
+ LogMessage logMessage = (LogMessage)arg;
+
+ boolean hierarchyMatches = logMessage.getLogHierarchy().equals(_expectedLogHierarchy);
+ boolean logMessageMatches = logMessage.toString().matches(_expectedLogMessageMatches);
+
+ if (!hierarchyMatches)
+ {
+ _hierarchyMatchesFailure = "LogHierarchy does not match. Expected " + _expectedLogHierarchy + " actual " + logMessage.getLogHierarchy();
+ }
+
+ if (!logMessageMatches)
+ {
+ _logMessageMatchesFailure = "LogMessage does not match. Expected " + _expectedLogMessageMatches + " actual " + logMessage.toString();
+ }
+
+ return hierarchyMatches && logMessageMatches;
+ }
+
+ @Override
+ public void describeTo(Description description)
+ {
+ if (_hierarchyMatchesFailure != null)
+ {
+ description.appendText(_hierarchyMatchesFailure);
+ }
+ if (_logMessageMatchesFailure != null)
+ {
+ description.appendText(_logMessageMatchesFailure);
+ }
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Jan 7 10:01:25 2013
@@ -659,7 +659,7 @@ public class SimpleAMQQueueTest extends
public void onRollback()
{
}
- }, 0L);
+ });
// Check that it is enqueued
AMQQueue data = store.getMessages().get(1L);
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Mon Jan 7 10:01:25 2013
@@ -630,7 +630,7 @@ public class MessageStoreTest extends In
{
//To change body of implemented methods use File | Settings | File Templates.
}
- }, 0L);
+ });
}
}
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Mon Jan 7 10:01:25 2013
@@ -82,7 +82,7 @@ public class AsyncAutoCommitTransactionT
AsyncAutoCommitTransaction asyncAutoCommitTransaction =
new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
- asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction, System.currentTimeMillis());
+ asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction);
verify(_storeTransaction).enqueueMessage(_queue, _message);
verify(_futureRecorder).recordFuture(_future, _postTransactionAction);
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Mon Jan 7 10:01:25 2013
@@ -137,7 +137,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -157,7 +157,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -175,7 +175,7 @@ public class AutoCommitTransactionTest e
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -198,7 +198,7 @@ public class AutoCommitTransactionTest e
try
{
- _transaction.enqueue(_queues, _message, _action, 0L);
+ _transaction.enqueue(_queues, _message, _action);
fail("Exception not thrown");
}
catch (RuntimeException re)
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Mon Jan 7 10:01:25 2013
@@ -140,7 +140,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(false);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -156,7 +156,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, false, false});
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -173,7 +173,7 @@ public class LocalTransactionTest extend
_message = createTestMessage(true);
_queues = createTestBaseQueues(new boolean[] {false, true, false, true});
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -196,7 +196,7 @@ public class LocalTransactionTest extend
try
{
- _transaction.enqueue(_queues, _message, _action1, 0L);
+ _transaction.enqueue(_queues, _message, _action1);
fail("Exception not thrown");
}
catch (RuntimeException re)
@@ -217,7 +217,7 @@ public class LocalTransactionTest extend
{
_message = createTestMessage(false);
_queue = createTestAMQQueue(false);
-
+
_transaction.dequeue(_queue, _message, _action1);
assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
@@ -465,7 +465,6 @@ public class LocalTransactionTest extend
*/
public void testRollbackWorkWithAdditionalPostAction() throws Exception
{
-
_message = createTestMessage(true);
_queue = createTestAMQQueue(true);
@@ -482,6 +481,122 @@ public class LocalTransactionTest extend
assertTrue("Rollback action2 must be fired", _action1.isRollbackActionFired());
}
+ public void testFirstEnqueueRecordsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.enqueue(_queue, _message, _action1);
+
+ assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
+ assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
+ }
+
+ public void testSubsequentEnqueueAdvancesTransactionUpdateTimeOnly() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ _transaction.enqueue(_queue, _message, _action1);
+
+ final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
+
+ Thread.sleep(1);
+ _transaction.enqueue(_queue, _message, _action2);
+
+ final long transactionStartTimeAfterSecondEnqueue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterSecondEnqueue = _transaction.getTransactionUpdateTime();
+
+ assertEquals("Transaction start time after second enqueue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterSecondEnqueue);
+ assertTrue("Transaction update time after second enqueue should be greater than first update time", transactionUpdateTimeAfterSecondEnqueue > transactionUpdateTimeAfterFirstEnqueue);
+ }
+
+ public void testFirstDequeueRecordsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.dequeue(_queue, _message, _action1);
+
+ assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
+ assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
+ }
+
+ public void testMixedEnqueuesAndDequeuesAdvancesTransactionUpdateTimeOnly() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ _transaction.enqueue(_queue, _message, _action1);
+
+ final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
+
+ Thread.sleep(1);
+ _transaction.dequeue(_queue, _message, _action2);
+
+ final long transactionStartTimeAfterFirstDequeue = _transaction.getTransactionStartTime();
+ final long transactionUpdateTimeAfterFirstDequeue = _transaction.getTransactionUpdateTime();
+
+ assertEquals("Transaction start time after first dequeue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterFirstDequeue);
+ assertTrue("Transaction update time after first dequeue should be greater than first update time", transactionUpdateTimeAfterFirstDequeue > transactionUpdateTimeAfterFirstEnqueue);
+ }
+
+ public void testCommitResetsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.enqueue(_queue, _message, _action1);
+
+ assertTrue(_transaction.getTransactionStartTime() >= startTime);
+ assertTrue(_transaction.getTransactionUpdateTime() >= startTime);
+
+ _transaction.commit();
+
+ assertEquals("Transaction start time should be reset after commit", 0, _transaction.getTransactionStartTime());
+ assertEquals("Transaction update time should be reset after commit", 0, _transaction.getTransactionUpdateTime());
+ }
+
+ public void testRollbackResetsTransactionStartAndUpdateTime() throws Exception
+ {
+ assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+ assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+ _message = createTestMessage(true);
+ _queue = createTestAMQQueue(true);
+
+ long startTime = System.currentTimeMillis();
+ _transaction.enqueue(_queue, _message, _action1);
+
+ assertTrue(_transaction.getTransactionStartTime() >= startTime);
+ assertTrue(_transaction.getTransactionUpdateTime() >= startTime);
+
+ _transaction.rollback();
+
+ assertEquals("Transaction start time should be reset after rollback", 0, _transaction.getTransactionStartTime());
+ assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
+ }
+
private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
{
Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java Mon Jan 7 10:01:25 2013
@@ -39,7 +39,7 @@ public class TransactionTimeoutTest exte
protected void configure() throws Exception
{
- // Setup housekeeping every second
+ // Setup housekeeping every 100ms
setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100");
if (getName().contains("ProducerIdle"))
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java Mon Jan 7 10:01:25 2013
@@ -23,17 +23,13 @@ package org.apache.qpid.test.unit.transa
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.Session;
+import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.util.LogMonitor;
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
@@ -41,6 +37,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -61,7 +58,7 @@ public abstract class TransactionTimeout
public static final String OPEN = "Open";
protected LogMonitor _monitor;
- protected AMQConnection _con;
+ protected Connection _con;
protected Session _psession, _csession;
protected Queue _queue;
protected MessageConsumer _consumer;
@@ -89,16 +86,14 @@ public abstract class TransactionTimeout
super.setUp();
// Connect to broker
- String broker = ("tcp://localhost:" + DEFAULT_PORT);
- ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'");
- _con = (AMQConnection) getConnection(url);
+ setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(1));
+ _con = getConnection();
_con.setExceptionListener(this);
_con.start();
// Create queue
Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED);
- AMQShortString queueName = new AMQShortString("test");
- _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true);
+ _queue = qsession.createQueue(getTestQueueName());
qsession.close();
// Create producer and consumer
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org