You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/07/10 15:56:46 UTC

svn commit: r1359673 - in /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server: AMQChannel.java handler/TxCommitHandler.java protocol/AMQProtocolEngine.java txn/LocalTransaction.java

Author: rgodfrey
Date: Tue Jul 10 13:56:45 2012
New Revision: 1359673

URL: http://svn.apache.org/viewvc?rev=1359673&view=rev
Log:
QPID-4125 : [Java Broker] allow coalescing of commits for multiple channels on same connection

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jul 10 13:56:45 2012
@@ -952,9 +952,11 @@ public class AMQChannel implements Sessi
 
     public void commit() throws AMQException
     {
-        commit(null);
+        commit(null, false);
     }
-    public void commit(Runnable immediateAction) throws AMQException
+
+
+    public void commit(final Runnable immediateAction, boolean async) throws AMQException
     {
 
         if (!isTransactional())
@@ -962,11 +964,29 @@ public class AMQChannel implements Sessi
             throw new AMQException("Fatal error: commit called on non-transactional channel");
         }
 
-        _transaction.commit(immediateAction);
+        if(async && _transaction instanceof LocalTransaction)
+        {
 
-        _txnCommits.incrementAndGet();
-        _txnStarts.incrementAndGet();
-        decrementOutstandingTxnsIfNecessary();
+            ((LocalTransaction)_transaction).commitAsync(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    immediateAction.run();
+                    _txnCommits.incrementAndGet();
+                    _txnStarts.incrementAndGet();
+                    decrementOutstandingTxnsIfNecessary();
+                }
+            });
+        }
+        else
+        {
+            _transaction.commit(immediateAction);
+
+            _txnCommits.incrementAndGet();
+            _txnStarts.incrementAndGet();
+            decrementOutstandingTxnsIfNecessary();
+        }
     }
 
     public void rollback() throws AMQException
@@ -1624,6 +1644,10 @@ public class AMQChannel implements Sessi
             cmd.awaitReadyForCompletion();
             cmd.complete();
         }
+        if(_transaction instanceof LocalTransaction)
+        {
+            ((LocalTransaction)_transaction).sync();
+        }
     }
 
     private static class AsyncCommand

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Tue Jul 10 13:56:45 2012
@@ -46,9 +46,9 @@ public class TxCommitHandler implements 
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxCommitBody body, final int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
 
         try
         {
@@ -62,11 +62,19 @@ public class TxCommitHandler implements 
             {
                 throw body.getChannelNotFoundException(channelId);
             }
-            channel.commit();
+            channel.commit(new Runnable()
+            {
+
+                @Override
+                public void run()
+                {
+                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+                    session.writeFrame(responseBody.generateFrame(channelId));
+                }
+            }, true);
+
 
-            MethodRegistry methodRegistry = session.getMethodRegistry();
-            AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
-            session.writeFrame(responseBody.generateFrame(channelId));
                         
         }
         catch (AMQException e)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Jul 10 13:56:45 2012
@@ -779,6 +779,9 @@ public class AMQProtocolEngine implement
     {
         if(_closing.compareAndSet(false,true))
         {
+            // force sync of outstanding async work
+            receiveComplete();
+
             // REMOVE THIS SHOULD NOT BE HERE.
             if (CurrentActor.get() == null)
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Tue Jul 10 13:56:45 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.txn;
 
+import org.apache.qpid.server.store.StoreFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +51,7 @@ public class LocalTransaction implements
     private volatile Transaction _transaction;
     private MessageStore _transactionLog;
     private long _txnStartTime = 0L;
+    private StoreFuture _asyncTran;
 
     public LocalTransaction(MessageStore transactionLog)
     {
@@ -68,11 +70,13 @@ public class LocalTransaction implements
 
     public void addPostTransactionAction(Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
     }
 
     public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         if(message.isPersistent() && queue.isDurable())
@@ -98,6 +102,7 @@ public class LocalTransaction implements
 
     public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         try
@@ -131,10 +136,7 @@ public class LocalTransaction implements
     {
         try
         {
-            for(Action action : _postTransactionActions)
-            {
-                action.onRollback();
-            }
+            doRollbackActions();
         }
         finally
         {
@@ -151,7 +153,7 @@ public class LocalTransaction implements
             }
             finally
             {
-		resetDetails();
+		        resetDetails();
             }
         }
 
@@ -176,6 +178,7 @@ public class LocalTransaction implements
 
     public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         if(message.isPersistent() && queue.isDurable())
@@ -201,6 +204,7 @@ public class LocalTransaction implements
 
     public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
     {
+        sync();
         _postTransactionActions.add(postTransactionAction);
 
         if (_txnStartTime == 0L)
@@ -239,11 +243,13 @@ public class LocalTransaction implements
 
     public void commit()
     {
+        sync();
         commit(null);
     }
 
     public void commit(Runnable immediateAction)
     {
+        sync();
         try
         {
             if(_transaction != null)
@@ -256,29 +262,137 @@ public class LocalTransaction implements
                 immediateAction.run();
             }
 
-            for(int i = 0; i < _postTransactionActions.size(); i++)
+            doPostTransactionActions();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Failed to commit transaction", e);
+
+            doRollbackActions();
+            throw new RuntimeException("Failed to commit transaction", e);
+        }
+        finally
+        {
+            resetDetails();
+        }
+    }
+
+    private void doRollbackActions()
+    {
+        for(Action action : _postTransactionActions)
+        {
+            action.onRollback();
+        }
+    }
+
+    public StoreFuture commitAsync(final Runnable deferred)
+    {
+        sync();
+        try
+        {
+            StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+            if(_transaction != null)
             {
-                _postTransactionActions.get(i).postCommit();
+                future = new StoreFuture()
+                        {
+                            private volatile boolean _completed = false;
+                            private StoreFuture _underlying = _transaction.commitTranAsync();
+
+                            @Override
+                            public boolean isComplete()
+                            {
+                                return _completed || checkUnderlyingCompletion();
+                            }
+
+                            @Override
+                            public void waitForCompletion()
+                            {
+                                if(!_completed)
+                                {
+                                    _underlying.waitForCompletion();
+                                    checkUnderlyingCompletion();
+                                }
+                            }
+
+                            private synchronized boolean checkUnderlyingCompletion()
+                            {
+                                if(!_completed && _underlying.isComplete())
+                                {
+                                    completeDeferredWork();
+                                    _completed = true;
+                                }
+                                return _completed;
+
+                            }
+
+                            private void completeDeferredWork()
+                            {
+                                try
+                                {
+                                    doPostTransactionActions();
+                                    deferred.run();
+
+                                }
+                                catch (Exception e)
+                                {
+                                    _logger.error("Failed to commit transaction", e);
+
+                                    doRollbackActions();
+                                    throw new RuntimeException("Failed to commit transaction", e);
+                                }
+                                finally
+                                {
+                                    resetDetails();
+                                }
+                            }
+
+                };
+                _asyncTran = future;
             }
+            else
+            {
+                try
+                {
+                    doPostTransactionActions();
+
+                    deferred.run();
+                }
+                finally
+                {
+                    resetDetails();
+                }
+            }
+
+            return future;
         }
         catch (Exception e)
         {
             _logger.error("Failed to commit transaction", e);
-
-            for(Action action : _postTransactionActions)
+            try
             {
-                action.onRollback();
+                doRollbackActions();
+            }
+            finally
+            {
+                resetDetails();
             }
             throw new RuntimeException("Failed to commit transaction", e);
         }
-        finally
+
+
+    }
+
+    private void doPostTransactionActions()
+    {
+        for(int i = 0; i < _postTransactionActions.size(); i++)
         {
-            resetDetails();
+            _postTransactionActions.get(i).postCommit();
         }
     }
 
     public void rollback()
     {
+        sync();
         try
         {
             if(_transaction != null)
@@ -295,10 +409,7 @@ public class LocalTransaction implements
         {
             try
             {
-                for(Action action : _postTransactionActions)
-                {
-                    action.onRollback();
-                }
+                doRollbackActions();
             }
             finally
             {
@@ -306,9 +417,19 @@ public class LocalTransaction implements
             }
         }
     }
-    
+
+    public void sync()
+    {
+        if(_asyncTran != null)
+        {
+            _asyncTran.waitForCompletion();
+            _asyncTran = null;
+        }
+    }
+
     private void resetDetails()
     {
+        _asyncTran = null;
         _transaction = null;
 	    _postTransactionActions.clear();
         _txnStartTime = 0L;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org