You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/07/10 15:56:46 UTC
svn commit: r1359673 - in
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server:
AMQChannel.java handler/TxCommitHandler.java
protocol/AMQProtocolEngine.java txn/LocalTransaction.java
Author: rgodfrey
Date: Tue Jul 10 13:56:45 2012
New Revision: 1359673
URL: http://svn.apache.org/viewvc?rev=1359673&view=rev
Log:
QPID-4125 : [Java Broker] allow coalescing of commits for multiple channels on same connection
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jul 10 13:56:45 2012
@@ -952,9 +952,11 @@ public class AMQChannel implements Sessi
public void commit() throws AMQException
{
- commit(null);
+ commit(null, false);
}
- public void commit(Runnable immediateAction) throws AMQException
+
+
+ public void commit(final Runnable immediateAction, boolean async) throws AMQException
{
if (!isTransactional())
@@ -962,11 +964,29 @@ public class AMQChannel implements Sessi
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _transaction.commit(immediateAction);
+ if(async && _transaction instanceof LocalTransaction)
+ {
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ ((LocalTransaction)_transaction).commitAsync(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ immediateAction.run();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+ });
+ }
+ else
+ {
+ _transaction.commit(immediateAction);
+
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
}
public void rollback() throws AMQException
@@ -1624,6 +1644,10 @@ public class AMQChannel implements Sessi
cmd.awaitReadyForCompletion();
cmd.complete();
}
+ if(_transaction instanceof LocalTransaction)
+ {
+ ((LocalTransaction)_transaction).sync();
+ }
}
private static class AsyncCommand
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Tue Jul 10 13:56:45 2012
@@ -46,9 +46,9 @@ public class TxCommitHandler implements
{
}
- public void methodReceived(AMQStateManager stateManager, TxCommitBody body, int channelId) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, TxCommitBody body, final int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -62,11 +62,19 @@ public class TxCommitHandler implements
{
throw body.getChannelNotFoundException(channelId);
}
- channel.commit();
+ channel.commit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ }, true);
+
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
}
catch (AMQException e)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Jul 10 13:56:45 2012
@@ -779,6 +779,9 @@ public class AMQProtocolEngine implement
{
if(_closing.compareAndSet(false,true))
{
+ // force sync of outstanding async work
+ receiveComplete();
+
// REMOVE THIS SHOULD NOT BE HERE.
if (CurrentActor.get() == null)
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1359673&r1=1359672&r2=1359673&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Tue Jul 10 13:56:45 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.txn;
+import org.apache.qpid.server.store.StoreFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@ public class LocalTransaction implements
private volatile Transaction _transaction;
private MessageStore _transactionLog;
private long _txnStartTime = 0L;
+ private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -68,11 +70,13 @@ public class LocalTransaction implements
public void addPostTransactionAction(Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -98,6 +102,7 @@ public class LocalTransaction implements
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
try
@@ -131,10 +136,7 @@ public class LocalTransaction implements
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -151,7 +153,7 @@ public class LocalTransaction implements
}
finally
{
- resetDetails();
+ resetDetails();
}
}
@@ -176,6 +178,7 @@ public class LocalTransaction implements
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -201,6 +204,7 @@ public class LocalTransaction implements
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if (_txnStartTime == 0L)
@@ -239,11 +243,13 @@ public class LocalTransaction implements
public void commit()
{
+ sync();
commit(null);
}
public void commit(Runnable immediateAction)
{
+ sync();
try
{
if(_transaction != null)
@@ -256,29 +262,137 @@ public class LocalTransaction implements
immediateAction.run();
}
- for(int i = 0; i < _postTransactionActions.size(); i++)
+ doPostTransactionActions();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ private void doRollbackActions()
+ {
+ for(Action action : _postTransactionActions)
+ {
+ action.onRollback();
+ }
+ }
+
+ public StoreFuture commitAsync(final Runnable deferred)
+ {
+ sync();
+ try
+ {
+ StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ if(_transaction != null)
{
- _postTransactionActions.get(i).postCommit();
+ future = new StoreFuture()
+ {
+ private volatile boolean _completed = false;
+ private StoreFuture _underlying = _transaction.commitTranAsync();
+
+ @Override
+ public boolean isComplete()
+ {
+ return _completed || checkUnderlyingCompletion();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ if(!_completed)
+ {
+ _underlying.waitForCompletion();
+ checkUnderlyingCompletion();
+ }
+ }
+
+ private synchronized boolean checkUnderlyingCompletion()
+ {
+ if(!_completed && _underlying.isComplete())
+ {
+ completeDeferredWork();
+ _completed = true;
+ }
+ return _completed;
+
+ }
+
+ private void completeDeferredWork()
+ {
+ try
+ {
+ doPostTransactionActions();
+ deferred.run();
+
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ };
+ _asyncTran = future;
}
+ else
+ {
+ try
+ {
+ doPostTransactionActions();
+
+ deferred.run();
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ return future;
}
catch (Exception e)
{
_logger.error("Failed to commit transaction", e);
-
- for(Action action : _postTransactionActions)
+ try
{
- action.onRollback();
+ doRollbackActions();
+ }
+ finally
+ {
+ resetDetails();
}
throw new RuntimeException("Failed to commit transaction", e);
}
- finally
+
+
+ }
+
+ private void doPostTransactionActions()
+ {
+ for(int i = 0; i < _postTransactionActions.size(); i++)
{
- resetDetails();
+ _postTransactionActions.get(i).postCommit();
}
}
public void rollback()
{
+ sync();
try
{
if(_transaction != null)
@@ -295,10 +409,7 @@ public class LocalTransaction implements
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -306,9 +417,19 @@ public class LocalTransaction implements
}
}
}
-
+
+ public void sync()
+ {
+ if(_asyncTran != null)
+ {
+ _asyncTran.waitForCompletion();
+ _asyncTran = null;
+ }
+ }
+
private void resetDetails()
{
+ _asyncTran = null;
_transaction = null;
_postTransactionActions.clear();
_txnStartTime = 0L;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org