You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2013/01/07 11:01:25 UTC

svn commit: r1429726 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/server/tran...

Author: kwall
Date: Mon Jan  7 10:01:25 2013
New Revision: 1429726

URL: http://svn.apache.org/viewvc?rev=1429726&view=rev
Log:
QPID-3569: Refactor TransactionTimeout

* Moved the duplicated transactionUpdateTime member from AMQChannel/ServerSession to ServerTransaction.
  ** LocalTransaction now maintains advances transactionUpdateTime on each enqueue/dequeue operation
  ** Other non-transactional ServerTransaction impls return transactionUpdateTime of 0 (as they already do for transactionStartTime).
  ** Changed LocalTransaction so that transaction start time is recorded on first enqueue or dequeue operation (rather than only
     first enqueue)
* Moved duplicated logic from AMQChannel/ServerSession#checkTransactionStatus to TransactionTimeoutHelper
* Make TransactionTimeoutTests use a durable queue so it is actually testing with store transactions.
* Removed warnings if operational logging is turned off.

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jan  7 10:01:25 2013
@@ -51,6 +51,7 @@ import org.apache.qpid.framing.MethodReg
 import org.apache.qpid.framing.abstraction.ContentChunk;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
 import org.apache.qpid.server.exchange.Exchange;
@@ -87,6 +88,7 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
@@ -145,7 +147,6 @@ public class AMQChannel implements AMQSe
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
-    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
     private final AMQProtocolSession _session;
     private AtomicBoolean _closing = new AtomicBoolean(false);
@@ -185,15 +186,29 @@ public class AMQChannel implements AMQSe
         // by default the session is non-transactional
         _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
-         _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+        _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
 
-         _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+        {
+            @Override
+            public void doTimeoutAction(String reason) throws AMQException
+            {
+                closeConnection(reason);
+            }
+        });
     }
 
     /** Sets this channel to be part of a local transaction */
     public void setLocalTransactional()
     {
-        _transaction = new LocalTransaction(_messageStore);
+        _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor()
+        {
+            @Override
+            public long getActivityTime()
+            {
+                return _session.getLastReceivedTime();
+            }
+        });
         _txnStarts.incrementAndGet();
     }
 
@@ -207,8 +222,6 @@ public class AMQChannel implements AMQSe
         sync();
     }
 
-
-
     private void incrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -229,11 +242,6 @@ public class AMQChannel implements AMQSe
         }
     }
 
-    public Long getTxnStarts()
-    {
-        return _txnStarts.get();
-    }
-
     public Long getTxnCommits()
     {
         return _txnCommits.get();
@@ -351,9 +359,8 @@ public class AMQChannel implements AMQSe
                             }
                         });
 
-                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
+                        _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
                         incrementOutstandingTxnsIfNecessary();
-                        updateTransactionalActivity();
                         _currentMessage.getStoredMessage().flushToStore();
                     }
                 }
@@ -839,7 +846,6 @@ public class AMQChannel implements AMQSe
     {
         Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
-	    updateTransactionalActivity();
     }
 
     private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
@@ -959,7 +965,6 @@ public class AMQChannel implements AMQSe
                     _txnCommits.incrementAndGet();
                     _txnStarts.incrementAndGet();
                     decrementOutstandingTxnsIfNecessary();
-                    _txnUpdateTime.set(0);
                 }
             });
         }
@@ -969,7 +974,6 @@ public class AMQChannel implements AMQSe
 
             _txnCommits.incrementAndGet();
             _txnStarts.incrementAndGet();
-            _txnUpdateTime.set(0);
             decrementOutstandingTxnsIfNecessary();
         }
     }
@@ -1007,7 +1011,6 @@ public class AMQChannel implements AMQSe
 
             _txnRejects.incrementAndGet();
             _txnStarts.incrementAndGet();
-            _txnUpdateTime.set(0);
             decrementOutstandingTxnsIfNecessary();
         }
 
@@ -1036,19 +1039,6 @@ public class AMQChannel implements AMQSe
             }
 
         }
-
-
-    }
-
-    /**
-     * Update last transaction activity timestamp
-     */
-    private void updateTransactionalActivity()
-    {
-        if (isTransactional())
-        {
-            _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
-        }
     }
 
     public String toString()
@@ -1215,11 +1205,6 @@ public class AMQChannel implements AMQSe
                 // TODO
                 throw new RuntimeException(e);
             }
-
-
-
-
-
         }
 
         public void onRollback()
@@ -1369,7 +1354,6 @@ public class AMQChannel implements AMQSe
 
         public void onRollback()
         {
-            //To change body of implemented methods use File | Settings | File Templates.
         }
     }
 
@@ -1478,37 +1462,9 @@ public class AMQChannel implements AMQSe
         return _createTime;
     }
 
-    public void mgmtClose() throws AMQException
-    {
-        _session.mgmtCloseChannel(_channelId);
-    }
-
     public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
     {
-        final long transactionStartTime = _transaction.getTransactionStartTime();
-        final long transactionUpdateTime = _txnUpdateTime.get();
-        if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
-        {
-            long currentTime = System.currentTimeMillis();
-            long openTime = currentTime - transactionStartTime;
-            long idleTime = currentTime - transactionUpdateTime;
-
-            _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
-                                                     TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
-            {
-                closeConnection("Idle transaction timed out");
-                return;
-            }
-
-            _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
-                                                     TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
-            {
-                closeConnection("Open transaction timed out");
-                return;
-            }
-        }
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
     }
 
     /**
@@ -1628,14 +1584,8 @@ public class AMQChannel implements AMQSe
             _action.postCommit();
             _action = null;
         }
-
-        boolean isReadyForCompletion()
-        {
-            return _future.isComplete();
-        }
     }
 
-
     @Override
     public int getConsumerCount()
     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java Mon Jan  7 10:01:25 2013
@@ -18,46 +18,85 @@
  */
 package org.apache.qpid.server;
 
-import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.txn.ServerTransaction;
 
 public class TransactionTimeoutHelper
 {
-    private static final Logger LOGGER = Logger.getLogger(TransactionTimeoutHelper.class);
-
-    public static final String IDLE_TRANSACTION_ALERT = "IDLE TRANSACTION ALERT";
-    public static final String OPEN_TRANSACTION_ALERT = "OPEN TRANSACTION ALERT";
+    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
+    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
 
     private final LogSubject _logSubject;
 
-    public TransactionTimeoutHelper(final LogSubject logSubject)
+    private final CloseAction _closeAction;
+
+    public TransactionTimeoutHelper(final LogSubject logSubject, final CloseAction closeAction)
     {
         _logSubject = logSubject;
+        _closeAction = closeAction;
     }
 
-    public void logIfNecessary(final long timeSoFar, final long warnTimeout,
-                               final LogMessage message, final String alternateLogPrefix)
+    public void checkIdleOrOpenTimes(ServerTransaction transaction, long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
     {
-        if (isTimedOut(timeSoFar, warnTimeout))
+        if (transaction.isTransactional())
         {
-            LogActor logActor = CurrentActor.get();
-            if(logActor.getRootMessageLogger().isMessageEnabled(logActor, _logSubject, message.getLogHierarchy()))
+            final long transactionUpdateTime = transaction.getTransactionUpdateTime();
+            if(transactionUpdateTime > 0)
             {
-                logActor.message(_logSubject, message);
+                long idleTime = System.currentTimeMillis() - transactionUpdateTime;
+                boolean closed = logAndCloseIfNecessary(idleTime, idleWarn, idleClose, ChannelMessages.IDLE_TXN(idleTime), IDLE_TRANSACTION_TIMEOUT_ERROR);
+                if (closed)
+                {
+                    return; // no point proceeding to check the open time
+                }
             }
-            else
+
+            final long transactionStartTime = transaction.getTransactionStartTime();
+            if(transactionStartTime > 0)
             {
-                LOGGER.warn(alternateLogPrefix + " " + _logSubject.toLogString() + " " + timeSoFar + " ms");
+                long openTime = System.currentTimeMillis() - transactionStartTime;
+                logAndCloseIfNecessary(openTime, openWarn, openClose, ChannelMessages.OPEN_TXN(openTime), OPEN_TRANSACTION_TIMEOUT_ERROR);
             }
         }
     }
 
-    public boolean isTimedOut(long timeSoFar, long timeout)
+    /**
+     * @return true iff closeTimeout was exceeded
+     */
+    private boolean logAndCloseIfNecessary(final long timeSoFar,
+            final long warnTimeout, final long closeTimeout,
+            final LogMessage warnMessage, final String closeMessage) throws AMQException
+    {
+        if (isTimedOut(timeSoFar, warnTimeout))
+        {
+            LogActor logActor = CurrentActor.get();
+            logActor.message(_logSubject, warnMessage);
+        }
+
+        if(isTimedOut(timeSoFar, closeTimeout))
+        {
+            _closeAction.doTimeoutAction(closeMessage);
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+    private boolean isTimedOut(long timeSoFar, long timeout)
     {
         return timeout > 0L && timeSoFar > timeout;
     }
+
+    public interface CloseAction
+    {
+        void doTimeoutAction(String reason) throws AMQException;
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Mon Jan  7 10:01:25 2013
@@ -80,7 +80,7 @@ public class ExchangeDestination impleme
             {
                 // NO-OP
             }
-        }, System.currentTimeMillis());
+        });
 
         return ACCEPTED;
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Mon Jan  7 10:01:25 2013
@@ -454,7 +454,7 @@ public abstract class QueueEntryImpl imp
                     {
 
                     }
-                }, 0L);
+                });
 
                 txn.dequeue(currentQueue, message, new ServerTransaction.Action()
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Mon Jan  7 10:01:25 2013
@@ -1398,7 +1398,7 @@ public class SimpleAMQQueue implements A
                                         {
 
                                         }
-                                    }, 0L);
+                                    });
                         txn.dequeue(this, entry.getMessage(),
                                     new ServerTransaction.Action()
                                     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Jan  7 10:01:25 2013
@@ -43,6 +43,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.TransactionTimeoutHelper;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -132,7 +133,6 @@ public class ServerSession extends Sessi
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
-    private final AtomicLong _txnUpdateTime = new AtomicLong(0);
 
     private Map<String, Subscription_0_10> _subscriptions = new ConcurrentHashMap<String, Subscription_0_10>();
 
@@ -147,7 +147,14 @@ public class ServerSession extends Sessi
         _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
         _logSubject = new ChannelLogSubject(this);
 
-        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+        {
+            @Override
+            public void doTimeoutAction(String reason) throws AMQException
+            {
+                getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
+            }
+        });
     }
 
     protected void setState(State state)
@@ -186,9 +193,8 @@ public class ServerSession extends Sessi
         }
         getConnectionModel().registerMessageReceived(message.getSize(), message.getArrivalTime());
         PostEnqueueAction postTransactionAction = new PostEnqueueAction(queues, message, isTransactional()) ;
-        _transaction.enqueue(queues,message, postTransactionAction, 0L);
+        _transaction.enqueue(queues,message, postTransactionAction);
         incrementOutstandingTxnsIfNecessary();
-        updateTransactionalActivity();
     }
 
 
@@ -402,7 +408,6 @@ public class ServerSession extends Sessi
                                      entry.release();
                                  }
                              });
-	    updateTransactionalActivity();
     }
 
     public Collection<Subscription_0_10> getSubscriptions()
@@ -549,7 +554,6 @@ public class ServerSession extends Sessi
 
         _txnCommits.incrementAndGet();
         _txnStarts.incrementAndGet();
-        _txnUpdateTime.set(0);
         decrementOutstandingTxnsIfNecessary();
     }
 
@@ -559,7 +563,6 @@ public class ServerSession extends Sessi
 
         _txnRejects.incrementAndGet();
         _txnStarts.incrementAndGet();
-        _txnUpdateTime.set(0);
         decrementOutstandingTxnsIfNecessary();
     }
 
@@ -584,22 +587,6 @@ public class ServerSession extends Sessi
         }
     }
 
-    /**
-     * Update last transaction activity timestamp
-     */
-    private void updateTransactionalActivity()
-    {
-        if (isTransactional())
-        {
-            _txnUpdateTime.set(System.currentTimeMillis());
-        }
-    }
-
-    public Long getTxnStarts()
-    {
-        return _txnStarts.get();
-    }
-
     public Long getTxnCommits()
     {
         return _txnCommits.get();
@@ -705,30 +692,7 @@ public class ServerSession extends Sessi
 
     public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
     {
-        final long transactionStartTime = _transaction.getTransactionStartTime();
-        final long transactionUpdateTime = _txnUpdateTime.get();
-        if (isTransactional() && transactionUpdateTime > 0 && transactionStartTime > 0)
-        {
-            long currentTime = System.currentTimeMillis();
-            long openTime = currentTime - transactionStartTime;
-            long idleTime = currentTime - transactionUpdateTime;
-
-            _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
-                                                     TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
-                return;
-            }
-
-            _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
-                                                     TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
-            if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
-            {
-                getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
-                return;
-            }
-        }
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
     }
 
     public void block(AMQQueue queue)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Mon Jan  7 10:01:25 2013
@@ -66,11 +66,18 @@ public class AsyncAutoCommitTransaction 
         _futureRecorder = recorder;
     }
 
+    @Override
     public long getTransactionStartTime()
     {
         return 0L;
     }
 
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return 0L;
+    }
+
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.
@@ -241,7 +248,7 @@ public class AsyncAutoCommitTransaction 
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Mon Jan  7 10:01:25 2013
@@ -52,11 +52,18 @@ public class AutoCommitTransaction imple
         _messageStore = transactionLog;
     }
 
+    @Override
     public long getTransactionStartTime()
     {
         return 0L;
     }
 
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return 0L;
+    }
+
     /**
      * Since AutoCommitTransaction have no concept of a long lived transaction, any Actions registered
      * by the caller are executed immediately.
@@ -178,7 +185,7 @@ public class AutoCommitTransaction imple
 
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
     {
         Transaction txn = null;
         try
@@ -270,4 +277,6 @@ public class AutoCommitTransaction imple
         }
     }
 
+
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Mon Jan  7 10:01:25 2013
@@ -26,7 +26,6 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Xid;
 
@@ -39,10 +38,6 @@ public class DistributedTransaction impl
 
     private final AutoCommitTransaction _autoCommitTransaction;
 
-    private volatile Transaction _transaction;
-
-    private long _txnStartTime = 0L;
-
     private DtxBranch _branch;
     private AMQSessionModel _session;
     private VirtualHost _vhost;
@@ -55,9 +50,16 @@ public class DistributedTransaction impl
         _autoCommitTransaction = new AutoCommitTransaction(vhost.getMessageStore());
     }
 
+    @Override
     public long getTransactionStartTime()
     {
-        return _txnStartTime;
+        return 0;
+    }
+
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return 0;
     }
 
     public void addPostTransactionAction(Action postTransactionAction)
@@ -107,7 +109,7 @@ public class DistributedTransaction impl
         {
             _branch.enqueue(queue, message);
             _branch.addPostTransactionAcion(postTransactionAction);
-            enqueue(Collections.singletonList(queue), message, postTransactionAction, System.currentTimeMillis());
+            enqueue(Collections.singletonList(queue), message, postTransactionAction);
         }
         else
         {
@@ -116,7 +118,7 @@ public class DistributedTransaction impl
     }
 
     public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message,
-                        Action postTransactionAction, long currentTime)
+                        Action postTransactionAction)
     {
         if(_branch != null)
         {
@@ -128,7 +130,7 @@ public class DistributedTransaction impl
         }
         else
         {
-            _autoCommitTransaction.enqueue(queues, message, postTransactionAction, currentTime);
+            _autoCommitTransaction.enqueue(queues, message, postTransactionAction);
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Mon Jan  7 10:01:25 2013
@@ -49,25 +49,42 @@ public class LocalTransaction implements
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
     private volatile Transaction _transaction;
-    private MessageStore _transactionLog;
+    private final ActivityTimeAccessor _activityTime;
+    private final MessageStore _transactionLog;
     private volatile long _txnStartTime = 0L;
+    private volatile long _txnUpdateTime = 0l;
     private StoreFuture _asyncTran;
 
     public LocalTransaction(MessageStore transactionLog)
     {
-        _transactionLog = transactionLog;
+        this(transactionLog, new ActivityTimeAccessor()
+        {
+            @Override
+            public long getActivityTime()
+            {
+                return System.currentTimeMillis();
+            }
+        });
     }
-    
-    public boolean inTransaction()
+
+    public LocalTransaction(MessageStore transactionLog, ActivityTimeAccessor activityTime)
     {
-        return _transaction != null;
+        _transactionLog = transactionLog;
+        _activityTime = activityTime;
     }
 
+    @Override
     public long getTransactionStartTime()
     {
         return _txnStartTime;
     }
 
+    @Override
+    public long getTransactionUpdateTime()
+    {
+        return _txnUpdateTime;
+    }
+
     public void addPostTransactionAction(Action postTransactionAction)
     {
         sync();
@@ -78,6 +95,7 @@ public class LocalTransaction implements
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         if(message.isPersistent() && queue.isDurable())
         {
@@ -104,6 +122,7 @@ public class LocalTransaction implements
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         try
         {
@@ -180,6 +199,7 @@ public class LocalTransaction implements
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         if(message.isPersistent() && queue.isDurable())
         {
@@ -189,7 +209,7 @@ public class LocalTransaction implements
                 {
                     _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString());
                 }
-                
+
                 beginTranIfNecessary();
                 _transaction.enqueueMessage(queue, message);
             }
@@ -202,15 +222,11 @@ public class LocalTransaction implements
         }
     }
 
-    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
+    public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction)
     {
         sync();
         _postTransactionActions.add(postTransactionAction);
-
-        if (_txnStartTime == 0L)
-        {
-            _txnStartTime = currentTime == 0L ? System.currentTimeMillis() : currentTime;
-        }
+        initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime();
 
         if(message.isPersistent())
         {
@@ -224,8 +240,7 @@ public class LocalTransaction implements
                         {
                             _logger.debug("Enqueue of message number " + message.getMessageNumber() + " to transaction log. Queue : " + queue.getNameShortString() );
                         }
-                        
-                        
+
                         beginTranIfNecessary();
                         _transaction.enqueueMessage(queue, message);
                     }
@@ -378,8 +393,6 @@ public class LocalTransaction implements
             }
             throw new RuntimeException("Failed to commit transaction", e);
         }
-
-
     }
 
     private void doPostTransactionActions()
@@ -437,16 +450,34 @@ public class LocalTransaction implements
         }
     }
 
+    private void initTransactionStartTimeIfNecessaryAndAdvanceUpdateTime()
+    {
+        long currentTime = _activityTime.getActivityTime();
+
+        if (_txnStartTime == 0)
+        {
+            _txnStartTime = currentTime;
+        }
+        _txnUpdateTime = currentTime;
+    }
+
     private void resetDetails()
     {
         _asyncTran = null;
         _transaction = null;
-	    _postTransactionActions.clear();
+        _postTransactionActions.clear();
         _txnStartTime = 0L;
+        _txnUpdateTime = 0;
     }
 
     public boolean isTransactional()
     {
         return true;
     }
+
+    public interface ActivityTimeAccessor
+    {
+        long getActivityTime();
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Mon Jan  7 10:01:25 2013
@@ -55,11 +55,18 @@ public interface ServerTransaction
 
     /**
      * Return the time the current transaction started.
-     * 
+     *
      * @return the time this transaction started or 0 if not in a transaction
      */
     long getTransactionStartTime();
 
+    /**
+     * Return the time of the last activity on the current transaction.
+     *
+     * @return the time of the last activity or 0 if not in a transaction
+     */
+    long getTransactionUpdateTime();
+
     /** 
      * Register an Action for execution after transaction commit or rollback.  Actions
      * will be executed in the order in which they are registered.
@@ -92,7 +99,7 @@ public interface ServerTransaction
      * 
      * Store operations will result only for a persistent messages on durable queues.
      */
-    void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime);
+    void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction);
 
     /** 
      * Commit the transaction represented by this object.

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/TransactionTimeoutHelperTest.java Mon Jan  7 10:01:25 2013
@@ -18,67 +18,131 @@
  */
 package org.apache.qpid.server;
 
-import static org.mockito.Matchers.any;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.qpid.server.logging.messages.ChannelMessages.IDLE_TXN_LOG_HIERARCHY;
+import static org.apache.qpid.server.logging.messages.ChannelMessages.OPEN_TXN_LOG_HIERARCHY;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.RootMessageLogger;
 import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.test.utils.QpidTestCase;
+import org.hamcrest.Description;
+import org.mockito.ArgumentMatcher;
 
 public class TransactionTimeoutHelperTest extends QpidTestCase
 {
-    private final LogMessage _logMessage = mock(LogMessage.class);
     private final LogActor _logActor = mock(LogActor.class);
     private final LogSubject _logSubject = mock(LogSubject.class);
+    private final ServerTransaction _transaction = mock(ServerTransaction.class);
+    private final CloseAction _closeAction = mock(CloseAction.class);
     private TransactionTimeoutHelper _transactionTimeoutHelper;
-    private RootMessageLogger _rootMessageLogger;
+    private long _now;
 
-    public void testLogIfNecessary()
+    public void testNotTransactional() throws Exception
     {
-        _transactionTimeoutHelper.logIfNecessary(99, 100, _logMessage, "");
-        verifyZeroInteractions(_logActor, _logMessage);
+        when(_transaction.isTransactional()).thenReturn(false);
 
-        _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, "");
-        verify(_logActor).message(_logSubject, _logMessage);
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 5, 10, 5, 10);
+
+        verifyZeroInteractions(_logActor, _closeAction);
+    }
+
+    public void testOpenTransactionProducesWarningOnly() throws Exception
+    {
+        final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+
+        configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo);
+
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(30), 0, 0, 0);
+
+        verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+        verifyZeroInteractions(_closeAction);
+    }
+
+    public void testOpenTransactionProducesTimeoutActionOnly() throws Exception
+    {
+        final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+
+        configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo);
+
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, SECONDS.toMillis(30), 0, 0);
+
+        verify(_closeAction).doTimeoutAction("Open transaction timed out");
+        verifyZeroInteractions(_logActor);
+    }
+
+    public void testOpenTransactionProducesWarningAndTimeoutAction() throws Exception
+    {
+        final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+
+        configureMockTransaction(sixtyOneSecondsAgo, sixtyOneSecondsAgo);
+
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(15), SECONDS.toMillis(30), 0, 0);
+
+        verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+        verify(_closeAction).doTimeoutAction("Open transaction timed out");
+    }
+
+    public void testIdleTransactionProducesWarningOnly() throws Exception
+    {
+        final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+        final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+        configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo);
+
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(30), 0);
+
+        verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+        verifyZeroInteractions(_closeAction);
     }
 
-    public void testLogIfNecessaryWhenOperationalLoggingDisabled()
+    public void testIdleTransactionProducesTimeoutActionOnly() throws Exception
     {
-        //disable the operational logging
-        when(_rootMessageLogger.isMessageEnabled(
-            same(_logActor), any(LogSubject.class), any(String.class)))
-            .thenReturn(false);
+        final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+        final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+        configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo);
+
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, 0, SECONDS.toMillis(30));
 
-        //verify the actor is never asked to log a message
-        _transactionTimeoutHelper.logIfNecessary(101, 100, _logMessage, "");
-        verify(_logActor, never()).message(any(LogMessage.class));
-        verify(_logActor, never()).message(any(LogSubject.class), any(LogMessage.class));
+        verify(_closeAction).doTimeoutAction("Idle transaction timed out");
+        verifyZeroInteractions(_logActor);
     }
 
-    public void testIsTimedOut()
+    public void testIdleTransactionProducesWarningAndTimeoutAction() throws Exception
     {
-        assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(199,200));
-        assertTrue("Should have timed out", _transactionTimeoutHelper.isTimedOut(201,200));
+        final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+        final long thrityOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+        configureMockTransaction(sixtyOneSecondsAgo, thrityOneSecondsAgo);
+
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, 0, 0, SECONDS.toMillis(15), SECONDS.toMillis(30));
+
+        verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+        verify(_closeAction).doTimeoutAction("Idle transaction timed out");
     }
 
-    /**
-     * If TransactionTimeout is disabled, the timeout will be 0. This test verifies
-     * that the helper methods respond negatively in this scenario.
-     */
-    public void testTransactionTimeoutDisabled()
+    public void testIdleAndOpenWarnings() throws Exception
     {
-        assertFalse("Shouldn't have timed out", _transactionTimeoutHelper.isTimedOut(201,0));
+        final long sixtyOneSecondsAgo = _now - SECONDS.toMillis(61);
+        final long thirtyOneSecondsAgo = _now - SECONDS.toMillis(31);
+
+        configureMockTransaction(sixtyOneSecondsAgo, thirtyOneSecondsAgo);
+
+        _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, SECONDS.toMillis(60), 0, SECONDS.toMillis(30), 0);
 
-        _transactionTimeoutHelper.logIfNecessary(99, 0, _logMessage, "");
-        verifyZeroInteractions(_logActor, _logMessage);
+        verify(_logActor).message(same(_logSubject), isLogMessage(IDLE_TXN_LOG_HIERARCHY, "CHN-1008 : Idle Transaction : 31,\\d{3} ms"));
+        verify(_logActor).message(same(_logSubject), isLogMessage(OPEN_TXN_LOG_HIERARCHY, "CHN-1007 : Open Transaction : 61,\\d{3} ms"));
+        verifyZeroInteractions(_closeAction);
     }
 
     @Override
@@ -88,14 +152,79 @@ public class TransactionTimeoutHelperTes
 
         CurrentActor.set(_logActor);
 
-        _rootMessageLogger = mock(RootMessageLogger.class);
-        when(_logActor.getRootMessageLogger()).thenReturn(_rootMessageLogger);
+        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, _closeAction);
+        _now = System.currentTimeMillis();
+    }
+
+    @Override
+    protected void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            CurrentActor.remove();
+        }
+    }
 
-        when(_rootMessageLogger.isMessageEnabled(
-                same(_logActor), any(LogSubject.class), any(String.class)))
-                .thenReturn(true);
+    private void configureMockTransaction(final long startTime, final long updateTime)
+    {
+        when(_transaction.isTransactional()).thenReturn(true);
+        when(_transaction.getTransactionStartTime()).thenReturn(startTime);
+        when(_transaction.getTransactionUpdateTime()).thenReturn(updateTime);
+    }
 
-        _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
+    private LogMessage isLogMessage(String expectedlogHierarchy, String expectedText)
+    {
+        return argThat(new IsLogMessage(expectedlogHierarchy, expectedText));
     }
 
+    class IsLogMessage extends ArgumentMatcher<LogMessage>
+    {
+        private final String _expectedLogHierarchy;
+        private final String _expectedLogMessageMatches;
+        private String _hierarchyMatchesFailure;
+        private String _logMessageMatchesFailure;
+
+        public IsLogMessage(String expectedlogHierarchy, String expectedLogMessageMatches)
+        {
+            _expectedLogHierarchy = expectedlogHierarchy;
+            _expectedLogMessageMatches = expectedLogMessageMatches;
+        }
+
+        public boolean matches(Object arg)
+        {
+            LogMessage logMessage = (LogMessage)arg;
+
+            boolean hierarchyMatches = logMessage.getLogHierarchy().equals(_expectedLogHierarchy);
+            boolean logMessageMatches = logMessage.toString().matches(_expectedLogMessageMatches);
+
+            if (!hierarchyMatches)
+            {
+                _hierarchyMatchesFailure = "LogHierarchy does not match. Expected " + _expectedLogHierarchy + " actual " + logMessage.getLogHierarchy();
+            }
+
+            if (!logMessageMatches)
+            {
+                _logMessageMatchesFailure = "LogMessage does not match. Expected " + _expectedLogMessageMatches + " actual " + logMessage.toString();
+            }
+
+            return hierarchyMatches && logMessageMatches;
+        }
+
+        @Override
+        public void describeTo(Description description)
+        {
+            if (_hierarchyMatchesFailure != null)
+            {
+                description.appendText(_hierarchyMatchesFailure);
+            }
+            if (_logMessageMatchesFailure != null)
+            {
+                description.appendText(_logMessageMatchesFailure);
+            }
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Jan  7 10:01:25 2013
@@ -659,7 +659,7 @@ public class SimpleAMQQueueTest extends 
                                         public void onRollback()
                                         {
                                         }
-                                    }, 0L);
+                                    });
 
         // Check that it is enqueued
         AMQQueue data = store.getMessages().get(1L);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java Mon Jan  7 10:01:25 2013
@@ -630,7 +630,7 @@ public class MessageStoreTest extends In
                 {
                     //To change body of implemented methods use File | Settings | File Templates.
                 }
-            }, 0L);
+            });
         }
     }
 

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java Mon Jan  7 10:01:25 2013
@@ -82,7 +82,7 @@ public class AsyncAutoCommitTransactionT
         AsyncAutoCommitTransaction asyncAutoCommitTransaction =
                 new AsyncAutoCommitTransaction(_messageStore, _futureRecorder);
 
-        asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction, System.currentTimeMillis());
+        asyncAutoCommitTransaction.enqueue(Collections.singletonList(_queue), _message, _postTransactionAction);
 
         verify(_storeTransaction).enqueueMessage(_queue, _message);
         verify(_futureRecorder).recordFuture(_future, _postTransactionAction);

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java Mon Jan  7 10:01:25 2013
@@ -137,7 +137,7 @@ public class AutoCommitTransactionTest e
         _message = createTestMessage(false);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action, 0L);
+        _transaction.enqueue(_queues, _message, _action);
 
         assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -157,7 +157,7 @@ public class AutoCommitTransactionTest e
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action, 0L);
+        _transaction.enqueue(_queues, _message, _action);
 
         assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -175,7 +175,7 @@ public class AutoCommitTransactionTest e
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, true, false, true});
         
-        _transaction.enqueue(_queues, _message, _action, 0L);
+        _transaction.enqueue(_queues, _message, _action);
 
         assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.COMMITTED, _storeTransaction.getState());
@@ -198,7 +198,7 @@ public class AutoCommitTransactionTest e
         
         try
         {
-            _transaction.enqueue(_queues, _message, _action, 0L);
+            _transaction.enqueue(_queues, _message, _action);
             fail("Exception not thrown");
         }
         catch (RuntimeException re)

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java Mon Jan  7 10:01:25 2013
@@ -140,7 +140,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(false);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action1, 0L);
+        _transaction.enqueue(_queues, _message, _action1);
 
         assertEquals("Enqueue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -156,7 +156,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, false, false});
         
-        _transaction.enqueue(_queues, _message, _action1, 0L);
+        _transaction.enqueue(_queues, _message, _action1);
   
         assertEquals("Enqueue of persistent message to non-durable queues must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.NOT_STARTED, _storeTransaction.getState());
@@ -173,7 +173,7 @@ public class LocalTransactionTest extend
         _message = createTestMessage(true);
         _queues = createTestBaseQueues(new boolean[] {false, true, false, true});
         
-        _transaction.enqueue(_queues, _message, _action1, 0L);
+        _transaction.enqueue(_queues, _message, _action1);
 
         assertEquals("Enqueue of persistent message to durable/non-durable queues must cause messages to be enqueued", 2, _storeTransaction.getNumberOfEnqueuedMessages());
         assertEquals("Unexpected transaction state", TransactionState.STARTED, _storeTransaction.getState());
@@ -196,7 +196,7 @@ public class LocalTransactionTest extend
         
         try
         {
-            _transaction.enqueue(_queues, _message, _action1, 0L);
+            _transaction.enqueue(_queues, _message, _action1);
             fail("Exception not thrown");
         }
         catch (RuntimeException re)
@@ -217,7 +217,7 @@ public class LocalTransactionTest extend
     {
         _message = createTestMessage(false);
         _queue = createTestAMQQueue(false);
-        
+
         _transaction.dequeue(_queue, _message, _action1);
 
         assertEquals("Dequeue of non-persistent message must not cause message to be enqueued", 0, _storeTransaction.getNumberOfEnqueuedMessages());
@@ -465,7 +465,6 @@ public class LocalTransactionTest extend
      */
     public void testRollbackWorkWithAdditionalPostAction() throws Exception
     {
-        
         _message = createTestMessage(true);
         _queue = createTestAMQQueue(true);
         
@@ -482,6 +481,122 @@ public class LocalTransactionTest extend
         assertTrue("Rollback action2 must be fired", _action1.isRollbackActionFired());
     }
 
+    public void testFirstEnqueueRecordsTransactionStartAndUpdateTime() throws Exception
+    {
+        assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+        assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+        _message = createTestMessage(true);
+        _queue = createTestAMQQueue(true);
+
+        long startTime = System.currentTimeMillis();
+        _transaction.enqueue(_queue, _message, _action1);
+
+        assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
+        assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
+    }
+
+    public void testSubsequentEnqueueAdvancesTransactionUpdateTimeOnly() throws Exception
+    {
+        assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+        assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+        _message = createTestMessage(true);
+        _queue = createTestAMQQueue(true);
+
+        _transaction.enqueue(_queue, _message, _action1);
+
+        final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime();
+        final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
+
+        Thread.sleep(1);
+        _transaction.enqueue(_queue, _message, _action2);
+
+        final long transactionStartTimeAfterSecondEnqueue = _transaction.getTransactionStartTime();
+        final long transactionUpdateTimeAfterSecondEnqueue = _transaction.getTransactionUpdateTime();
+
+        assertEquals("Transaction start time after second enqueue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterSecondEnqueue);
+        assertTrue("Transaction update time after second enqueue should be greater than first update time", transactionUpdateTimeAfterSecondEnqueue > transactionUpdateTimeAfterFirstEnqueue);
+    }
+
+    public void testFirstDequeueRecordsTransactionStartAndUpdateTime() throws Exception
+    {
+        assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+        assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+        _message = createTestMessage(true);
+        _queue = createTestAMQQueue(true);
+
+        long startTime = System.currentTimeMillis();
+        _transaction.dequeue(_queue, _message, _action1);
+
+        assertTrue("Transaction start time should have been recorded", _transaction.getTransactionStartTime() >= startTime);
+        assertEquals("Transaction update time should be the same as transaction start time", _transaction.getTransactionStartTime(), _transaction.getTransactionUpdateTime());
+    }
+
+    public void testMixedEnqueuesAndDequeuesAdvancesTransactionUpdateTimeOnly() throws Exception
+    {
+        assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+        assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+        _message = createTestMessage(true);
+        _queue = createTestAMQQueue(true);
+
+        _transaction.enqueue(_queue, _message, _action1);
+
+        final long transactionStartTimeAfterFirstEnqueue = _transaction.getTransactionStartTime();
+        final long transactionUpdateTimeAfterFirstEnqueue = _transaction.getTransactionUpdateTime();
+
+        Thread.sleep(1);
+        _transaction.dequeue(_queue, _message, _action2);
+
+        final long transactionStartTimeAfterFirstDequeue = _transaction.getTransactionStartTime();
+        final long transactionUpdateTimeAfterFirstDequeue = _transaction.getTransactionUpdateTime();
+
+        assertEquals("Transaction start time after first dequeue should be unchanged", transactionStartTimeAfterFirstEnqueue, transactionStartTimeAfterFirstDequeue);
+        assertTrue("Transaction update time after first dequeue should be greater than first update time", transactionUpdateTimeAfterFirstDequeue > transactionUpdateTimeAfterFirstEnqueue);
+    }
+
+    public void testCommitResetsTransactionStartAndUpdateTime() throws Exception
+    {
+        assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+        assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+        _message = createTestMessage(true);
+        _queue = createTestAMQQueue(true);
+
+        long startTime = System.currentTimeMillis();
+        _transaction.enqueue(_queue, _message, _action1);
+
+        assertTrue(_transaction.getTransactionStartTime() >= startTime);
+        assertTrue(_transaction.getTransactionUpdateTime() >= startTime);
+
+        _transaction.commit();
+
+        assertEquals("Transaction start time should be reset after commit", 0, _transaction.getTransactionStartTime());
+        assertEquals("Transaction update time should be reset after commit", 0, _transaction.getTransactionUpdateTime());
+    }
+
+    public void testRollbackResetsTransactionStartAndUpdateTime() throws Exception
+    {
+        assertEquals("Unexpected transaction start time before test", 0, _transaction.getTransactionStartTime());
+        assertEquals("Unexpected transaction update time before test", 0, _transaction.getTransactionUpdateTime());
+
+        _message = createTestMessage(true);
+        _queue = createTestAMQQueue(true);
+
+        long startTime = System.currentTimeMillis();
+        _transaction.enqueue(_queue, _message, _action1);
+
+        assertTrue(_transaction.getTransactionStartTime() >= startTime);
+        assertTrue(_transaction.getTransactionUpdateTime() >= startTime);
+
+        _transaction.rollback();
+
+        assertEquals("Transaction start time should be reset after rollback", 0, _transaction.getTransactionStartTime());
+        assertEquals("Transaction update time should be reset after rollback", 0, _transaction.getTransactionUpdateTime());
+    }
+
     private Collection<QueueEntry> createTestQueueEntries(boolean[] queueDurableFlags, boolean[] messagePersistentFlags)
     {
         Collection<QueueEntry> queueEntries = new ArrayList<QueueEntry>();

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTest.java Mon Jan  7 10:01:25 2013
@@ -39,7 +39,7 @@ public class TransactionTimeoutTest exte
 
     protected void configure() throws Exception
     {
-        // Setup housekeeping every second
+        // Setup housekeeping every 100ms
         setConfigurationProperty("virtualhosts.virtualhost." + VIRTUALHOST + ".housekeeping.checkPeriod", "100");
 
         if (getName().contains("ProducerIdle"))

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java?rev=1429726&r1=1429725&r2=1429726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/TransactionTimeoutTestCase.java Mon Jan  7 10:01:25 2013
@@ -23,17 +23,13 @@ package org.apache.qpid.test.unit.transa
 import junit.framework.TestCase;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQConnectionURL;
-import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.Session;
+import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 import org.apache.qpid.util.LogMonitor;
 
+import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
@@ -41,6 +37,7 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
+import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -61,7 +58,7 @@ public abstract class TransactionTimeout
     public static final String OPEN = "Open";
     
     protected LogMonitor _monitor;
-    protected AMQConnection _con;
+    protected Connection _con;
     protected Session _psession, _csession;
     protected Queue _queue;
     protected MessageConsumer _consumer;
@@ -89,16 +86,14 @@ public abstract class TransactionTimeout
         super.setUp();
         
         // Connect to broker
-        String broker = ("tcp://localhost:" + DEFAULT_PORT);
-        ConnectionURL url = new AMQConnectionURL("amqp://guest:guest@clientid/test?brokerlist='" + broker + "'&maxprefetch='1'");
-        _con = (AMQConnection) getConnection(url);
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, String.valueOf(1));
+        _con = getConnection();
         _con.setExceptionListener(this);
         _con.start();
         
         // Create queue
         Session qsession = _con.createSession(true, Session.SESSION_TRANSACTED);
-        AMQShortString queueName = new AMQShortString("test");
-        _queue = new AMQQueue(qsession.getDefaultQueueExchangeName(), queueName, queueName, false, true);
+        _queue = qsession.createQueue(getTestQueueName());
         qsession.close();
         
         // Create producer and consumer



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org