You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/11 17:10:48 UTC
svn commit: r824084 [1/5] - in /qpid/branches/java-broker-0-10/qpid/java:
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apache/q...
Author: rgodfrey
Date: Sun Oct 11 15:10:43 2009
New Revision: 824084
URL: http://svn.apache.org/viewvc?rev=824084&view=rev
Log:
Migrate 0-8 code to use same Txn code as 0-10
Added:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java (with props)
Removed:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/txn/TxnBufferTest.java
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageReference.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/TransferMessageReference.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Purge.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/SimpleConnectionTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/BindingLoggingTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Sun Oct 11 15:10:43 2009
@@ -197,7 +197,7 @@
return false;
}
- public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload)
{
Long value = new Long(SizeOf.getUsedMemory());
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Sun Oct 11 15:10:43 2009
@@ -113,7 +113,7 @@
{
}
- public ArrayList<AMQQueue> route(InboundMessage message) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage message)
{
return new ArrayList<AMQQueue>();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Sun Oct 11 15:10:43 2009
@@ -63,7 +63,7 @@
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueueMBean;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
@@ -78,7 +78,7 @@
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
private final ExchangeFactory _exchangeFactory;
- private final MessageStore _messageStore;
+ private final DurableConfigurationStore _durableConfig;
private final VirtualHost.VirtualHostMBean _virtualHostMBean;
@@ -92,7 +92,7 @@
_queueRegistry = virtualHost.getQueueRegistry();
_exchangeRegistry = virtualHost.getExchangeRegistry();
- _messageStore = virtualHost.getMessageStore();
+ _durableConfig = virtualHost.getDurableConfigurationStore();
_exchangeFactory = virtualHost.getExchangeFactory();
}
@@ -277,7 +277,7 @@
null);
if (queue.isDurable() && !queue.isAutoDelete())
{
- _messageStore.createQueue(queue);
+ _durableConfig.createQueue(queue);
}
_queueRegistry.registerQueue(queue);
@@ -319,7 +319,7 @@
queue.delete();
if (queue.isDurable())
{
- _messageStore.removeQueue(queue);
+ _durableConfig.removeQueue(queue);
}
}
catch (AMQException ex)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Sun Oct 11 15:10:43 2009
@@ -20,24 +20,20 @@
*/
package org.apache.qpid.server;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -47,14 +43,11 @@
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.txn.LocalTransactionalContext;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.txn.*;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -64,7 +57,11 @@
{
public static final int DEFAULT_PREFETCH = 5000;
- private static final Logger _log = Logger.getLogger(AMQChannel.class);
+ private static final Logger _logger = Logger.getLogger(AMQChannel.class);
+
+ private static final boolean MSG_AUTH =
+ ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
+
private final int _channelId;
@@ -97,15 +94,13 @@
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
+ // Set of messages being acknoweledged in the current transaction
+ private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
+
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private TransactionalContext _txnContext;
+ private Transaction _transaction;
- /**
- * A context used by the message store enabling it to track context for a given channel even across thread
- * boundaries
- */
- private final StoreContext _storeContext;
private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
@@ -122,6 +117,10 @@
private LogActor _actor;
private LogSubject _logSubject;
+ private volatile boolean _rollingBack;
+
+ private static final Runnable NULL_TASK = new Runnable() { public void run() {} };
+ private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -131,22 +130,19 @@
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_logSubject = new ChannelLogSubject(this);
-
- _actor.message(ChannelMessages.CHN_1001());
-
- _storeContext = new StoreContext("Session: " + session.getClientIdentifier() + "; channel: " + channelId);
+ _actor.message(ChannelMessages.CHN_1001());
_messageStore = messageStore;
// by default the session is non-transactional
- _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
+ _transaction = new AutoCommitTransaction(_messageStore);
}
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _txnContext = new LocalTransactionalContext(this);
+ _transaction = new LocalTransaction(_messageStore);
}
public boolean isTransactional()
@@ -154,7 +150,7 @@
// this does not look great but there should only be one "non-transactional"
// transactional context, while there could be several transactional ones in
// theory
- return !(_txnContext instanceof NonTransactionalContext);
+ return !(_transaction instanceof AutoCommitTransaction);
}
public int getChannelId()
@@ -165,43 +161,61 @@
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
{
- _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _txnContext, _session);
- _currentMessage.setMessageStore(_messageStore);
+ _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _session);
_currentMessage.setExchange(e);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
throws AMQException
{
- StoreContext.setCurrentContext(_storeContext);
- try
+ if (_currentMessage == null)
{
- if (_currentMessage == null)
+ throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
{
- throw new AMQException("Received content header without previously receiving a BasicPublish frame");
+ _logger.debug("Content header received on channel " + _channelId);
}
- else
- {
- if (_log.isDebugEnabled())
- {
- _log.debug("Content header received on channel " + _channelId);
- }
- _currentMessage.setContentHeaderBody(contentHeaderBody);
+ _currentMessage.setContentHeaderBody(contentHeaderBody);
+
+ _currentMessage.setExpiration();
- _currentMessage.setExpiration();
+ routeCurrentMessage();
- routeCurrentMessage();
+ MessageMetaData mmd = _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
- _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+ if(_currentMessage.isPersistent())
+ {
+ final Long messageNumber = _currentMessage.getMessageNumber();
+
+ _messageStore.storeMessageMetaData(messageNumber, mmd);
- deliverCurrentMessageIfComplete();
+ _transaction.addPostCommitAction(new Transaction.Action()
+ {
+
+ public void postCommit()
+ {
+ }
+ public void onRollback()
+ {
+ try
+ {
+ _messageStore.removeMessage(messageNumber);
+ }
+ catch (AMQException e)
+ {
+
+ }
+ }
+ });
}
- }
- finally
- {
- StoreContext.clearCurrentContext();
+
+ deliverCurrentMessageIfComplete();
+
}
}
@@ -213,21 +227,36 @@
{
try
{
- _currentMessage.deliverToQueues();
- }
- catch (NoRouteException e)
- {
- _returnMessages.add(e);
- }
- catch(UnauthorizedAccessException ex)
- {
- _returnMessages.add(ex);
+
+ final ArrayList<AMQQueue> destinationQueues = _currentMessage.getDestinationQueues();
+
+ if(!checkMessageUserId(_currentMessage.getContentHeader()))
+ {
+ _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.ACCESS_REFUSED, "Access Refused", createAMQMessage(_currentMessage)));
+ }
+ else
+ {
+ if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty())
+ {
+ if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
+ {
+ _transaction.addPostCommitAction(new WriteReturnAction(AMQConstant.NO_ROUTE, "No Route for message", createAMQMessage(_currentMessage)));
+ }
+ else
+ {
+ _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+ }
+
+ }
+ else
+ {
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
+
+ }
+ }
}
finally
{
- // callback to allow the context to do any post message processing
- // primary use is to allow message return processing in the non-tx case
- _txnContext.messageProcessed(_session);
_currentMessage = null;
}
}
@@ -236,15 +265,14 @@
public void publishContentBody(ContentBody contentBody) throws AMQException
{
- StoreContext.setCurrentContext(_storeContext);
if (_currentMessage == null)
{
throw new AMQException("Received content body without previously receiving a JmsPublishBody");
}
- if (_log.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- _log.debug(debugIdentity() + "Content body received on channel " + _channelId);
+ _logger.debug(debugIdentity() + "Content body received on channel " + _channelId);
}
try
@@ -252,10 +280,19 @@
// returns true iff the message was delivered (i.e. if all data was
// received
- _currentMessage.addContentBodyFrame(
- _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(
- contentBody));
+ final ContentChunk contentChunk =
+ _session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
+
+ int chunkId = _currentMessage.addContentBodyFrame(contentChunk);
+ if(_currentMessage.isPersistent())
+ {
+ final Long messageNumber = _currentMessage.getMessageNumber();
+ _messageStore.storeContentBodyChunk(messageNumber, chunkId,
+ contentChunk, _currentMessage.allContentReceived());
+
+
+ }
deliverCurrentMessageIfComplete();
}
@@ -266,23 +303,11 @@
_currentMessage = null;
throw e;
}
- finally
- {
- StoreContext.clearCurrentContext();
- }
}
protected void routeCurrentMessage() throws AMQException
{
- try
- {
- _currentMessage.route();
- }
- catch (NoRouteException e)
- {
- //_currentMessage.incrementReference();
- _returnMessages.add(e);
- }
+ _currentMessage.route();
}
public long getNextDeliveryTag()
@@ -359,12 +384,12 @@
Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
if (sub != null)
{
- try
+ try
{
sub.getSendLock();
sub.getQueue().unregisterSubscription(sub);
}
- finally
+ finally
{
sub.releaseSendLock();
}
@@ -372,7 +397,7 @@
}
else
{
- _log.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
+ _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered.");
}
return false;
}
@@ -384,15 +409,17 @@
*/
public void close() throws AMQException
{
- _txnContext.rollback();
+
unsubscribeAllConsumers();
+ _transaction.rollback();
+
try
{
requeue();
}
catch (AMQException e)
{
- _log.error("Caught AMQException whilst attempting to reque:" + e);
+ _logger.error("Caught AMQException whilst attempting to reque:" + e);
}
setClosing(true);
@@ -407,23 +434,23 @@
private void unsubscribeAllConsumers() throws AMQException
{
- if (_log.isInfoEnabled())
+ if (_logger.isInfoEnabled())
{
if (!_tag2SubscriptionMap.isEmpty())
{
- _log.info("Unsubscribing all consumers on channel " + toString());
+ _logger.info("Unsubscribing all consumers on channel " + toString());
}
else
{
- _log.info("No consumers to unsubscribe on channel " + toString());
+ _logger.info("No consumers to unsubscribe on channel " + toString());
}
}
for (Map.Entry<AMQShortString, Subscription> me : _tag2SubscriptionMap.entrySet())
{
- if (_log.isInfoEnabled())
+ if (_logger.isInfoEnabled())
{
- _log.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
+ _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString());
}
Subscription sub = me.getValue();
@@ -437,7 +464,7 @@
{
sub.releaseSendLock();
}
-
+
}
_tag2SubscriptionMap.clear();
@@ -453,17 +480,17 @@
*/
public void addUnacknowledgedMessage(QueueEntry entry, long deliveryTag, Subscription subscription)
{
- if (_log.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
if (entry.getQueue() == null)
{
- _log.debug("Adding unacked message with a null queue:" + entry);
+ _logger.debug("Adding unacked message with a null queue:" + entry);
}
else
{
- if (_log.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
+ _logger.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
+ ") with a queue(" + entry.getQueue() + ") for " + subscription);
}
}
@@ -491,27 +518,13 @@
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<QueueEntry> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
- // Deliver these messages out of the transaction as their delivery was never
- // part of the transaction only the receive.
- TransactionalContext deliveryContext = null;
-
if (!messagesToBeDelivered.isEmpty())
{
- if (_log.isInfoEnabled())
+ if (_logger.isInfoEnabled())
{
- _log.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
+ _logger.info("Requeuing " + messagesToBeDelivered.size() + " unacked messages. for " + toString());
}
- if (!(_txnContext instanceof NonTransactionalContext))
- {
-
- deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- else
- {
- deliveryContext = _txnContext;
- }
}
for (QueueEntry unacked : messagesToBeDelivered)
@@ -524,13 +537,10 @@
// Ensure message is released for redelivery
unacked.release();
- // Deliver Message
- deliveryContext.requeue(unacked);
-
}
else
{
- unacked.discard(_storeContext);
+ unacked.discard();
}
}
@@ -555,43 +565,22 @@
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
{
- unacked.release();
- }
-
- // Deliver these messages out of the transaction as their delivery was never
- // part of the transaction only the receive.
- TransactionalContext deliveryContext;
- if (!(_txnContext instanceof NonTransactionalContext))
- {
-
- deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
-
- }
- else
- {
- deliveryContext = _txnContext;
- }
+ // Ensure message is released for redelivery
+ unacked.release();
- if (!unacked.isQueueDeleted())
- {
- // Redeliver the messages to the front of the queue
- deliveryContext.requeue(unacked);
- // Deliver increments the message count but we have already deliverted this once so don't increment it again
- // this was because deliver did an increment changed this.
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
+ _logger.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
- unacked.discard(_storeContext);
+ unacked.discard();
}
}
else
{
- _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+ _logger.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists."
+ _unacknowledgedMessageMap.size());
}
@@ -612,28 +601,31 @@
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
- if (_log.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
- _log.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
+ _logger.debug("unacked map Size:" + _unacknowledgedMessageMap.size());
}
// Process the Unacked-Map.
// Marking messages who still have a consumer for to be resent
// and those that don't to be requeued.
- _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
- msgToResend, requeue, _storeContext));
+ _unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap,
+ msgToRequeue,
+ msgToResend,
+ requeue,
+ _messageStore));
// Process Messages to Resend
- if (_log.isDebugEnabled())
+ if (_logger.isDebugEnabled())
{
if (!msgToResend.isEmpty())
{
- _log.debug("Preparing (" + msgToResend.size() + ") message to resend.");
+ _logger.debug("Preparing (" + msgToResend.size() + ") message to resend.");
}
else
{
- _log.debug("No message to resend.");
+ _logger.debug("No message to resend.");
}
}
@@ -653,7 +645,7 @@
// i.e. The channel hasn't been server side suspended.
// if (isSuspended())
// {
- // _log.info("Channel is suspended so requeuing");
+ // _logger.info("Channel is suspended so requeuing");
// //move this message to requeue
// msgToRequeue.add(message);
// }
@@ -669,8 +661,8 @@
if (sub != null)
{
-
- if(!queue.resend(message, sub))
+
+ if(!queue.resend(message,sub))
{
msgToRequeue.put(deliveryTag, message);
}
@@ -678,9 +670,9 @@
else
{
- if (_log.isInfoEnabled())
+ if (_logger.isInfoEnabled())
{
- _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+ _logger.info("DeliveredSubscription not recorded so just requeueing(" + message.toString()
+ ")to prevent loss");
}
// move this message to requeue
@@ -689,91 +681,28 @@
} // for all messages
// } else !isSuspend
- if (_log.isInfoEnabled())
+ if (_logger.isInfoEnabled())
{
if (!msgToRequeue.isEmpty())
{
- _log.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
+ _logger.info("Preparing (" + msgToRequeue.size() + ") message to requeue to.");
}
}
- // Deliver these messages out of the transaction as their delivery was never
- // part of the transaction only the receive.
- TransactionalContext deliveryContext;
- if (!(_txnContext instanceof NonTransactionalContext))
- {
-
- deliveryContext =
- new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages);
- }
- else
- {
- deliveryContext = _txnContext;
- }
-
// Process Messages to Requeue at the front of the queue
for (Map.Entry<Long, QueueEntry> entry : msgToRequeue.entrySet())
{
QueueEntry message = entry.getValue();
long deliveryTag = entry.getKey();
-
- message.release();
- message.setRedelivered(true);
+ _unacknowledgedMessageMap.remove(deliveryTag);
- deliveryContext.requeue(message);
+ message.setRedelivered(true);
+ message.release();
- _unacknowledgedMessageMap.remove(deliveryTag);
}
}
- /**
- * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged messages to
- * remove the queue reference and also decrement any message reference counts, without actually removing the item
- * since we may get an ack for a delivery tag that was generated from the deleted queue.
- *
- * @param queue the queue that has been deleted
- *
- */
- /* public void queueDeleted(final AMQQueue queue)
- {
- try
- {
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
- public boolean callback(UnacknowledgedMessage message)
- {
- if (message.getQueue() == queue)
- {
- try
- {
- message.discard(_storeContext);
- message.setQueueDeleted(true);
- }
- catch (AMQException e)
- {
- _log.error(
- "Error decrementing ref count on message " + message.getMessage().getMessageId() + ": " + e, e);
- throw new RuntimeException(e);
- }
- }
-
- return false;
- }
-
- public void visitComplete()
- {
- }
- });
- }
- catch (AMQException e)
- {
- _log.error("Unexpected Error while handling deletion of queue", e);
- throw new RuntimeException(e);
- }
-
- }
-*/
/**
* Acknowledge one or more messages.
*
@@ -785,7 +714,17 @@
*/
public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
{
- _unacknowledgedMessageMap.acknowledgeMessage(deliveryTag, multiple, _txnContext);
+ Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
+ _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
+ }
+
+ private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
+ {
+
+ Map<Long, QueueEntry> ackedMessageMap = new LinkedHashMap<Long,QueueEntry>();
+ _unacknowledgedMessageMap.collect(deliveryTag, multiple, ackedMessageMap);
+ _unacknowledgedMessageMap.remove(ackedMessageMap);
+ return ackedMessageMap.values();
}
/**
@@ -879,12 +818,69 @@
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _txnContext.commit();
+ _transaction.commit();
+
}
public void rollback() throws AMQException
{
- _txnContext.rollback();
+ rollback(NULL_TASK);
+ }
+
+ public void rollback(Runnable postRollbackTask) throws AMQException
+ {
+ if (!isTransactional())
+ {
+ throw new AMQException("Fatal error: commit called on non-transactional channel");
+ }
+
+ // stop all subscriptions
+ _rollingBack = true;
+ boolean requiresSuspend = _suspended.compareAndSet(false,true);
+
+ // ensure all subscriptions have seen the change to the channel state
+ for(Subscription sub : _tag2SubscriptionMap.values())
+ {
+ sub.getSendLock();
+ sub.releaseSendLock();
+ }
+
+ try
+ {
+ _transaction.rollback();
+ }
+ finally
+ {
+ _rollingBack = false;
+ }
+
+ postRollbackTask.run();
+
+ for(QueueEntry entry : _resendList)
+ {
+ Subscription sub = entry.getDeliveredSubscription();
+ if(sub == null || sub.isClosed())
+ {
+ entry.release();
+ }
+ else
+ {
+ sub.getQueue().resend(entry, sub);
+ }
+ }
+ _resendList.clear();
+
+ if(requiresSuspend)
+ {
+ _suspended.set(false);
+ for(Subscription sub : _tag2SubscriptionMap.values())
+ {
+ sub.getQueue().deliverAsync(sub);
+ }
+
+ }
+
+
}
public String toString()
@@ -902,23 +898,22 @@
return _defaultQueue;
}
- public StoreContext getStoreContext()
- {
- return _storeContext;
- }
-
public void processReturns() throws AMQException
{
if (!_returnMessages.isEmpty())
{
- StoreContext sc =StoreContext.setCurrentContext(_storeContext);
+
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
ServerMessage serverMessage = bouncedMessage.getAMQMessage();
if(serverMessage instanceof AMQMessage)
{
AMQMessage message = (AMQMessage) serverMessage;
- _session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(),
+ _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message.getBodyFrameIterator(_session,_channelId),
+ _channelId,
+ bouncedMessage.getReplyCode().getCode(),
new AMQShortString(bouncedMessage.getMessage()));
}
@@ -929,18 +924,12 @@
}
bouncedMessage.release();
}
- StoreContext.setCurrentContext(sc);
+
_returnMessages.clear();
}
}
-
- public TransactionalContext getTransactionalContext()
- {
- return _txnContext;
- }
-
public boolean isClosing()
{
return _closing;
@@ -962,11 +951,6 @@
_creditManager.setCreditLimits(prefetchSize, prefetchCount);
}
- public List<RequiredDeliveryException> getReturnMessages()
- {
- return _returnMessages;
- }
-
public MessageStore getMessageStore()
{
return _messageStore;
@@ -981,7 +965,7 @@
ServerMessage msg = entry.getMessage();
if(msg instanceof AMQMessage)
{
- getProtocolSession().getProtocolOutputConverter().writeDeliver((AMQMessage)msg, getChannelId(),
+ getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
deliveryTag, sub.getConsumerTag());
}
else
@@ -1011,6 +995,203 @@
return _recordDeliveryMethod;
}
+
+ private AMQMessage createAMQMessage(IncomingMessage incomingMessage)
+ throws AMQException
+ {
+
+
+
+ final AMQMessageHandle messageHandle = incomingMessage.getMessageHandle();
+ final MessagePublishInfo messagePublishInfo = incomingMessage.getMessagePublishInfo();
+ final ContentHeaderBody header = incomingMessage.getContentHeader();
+
+
+
+ AMQMessage message = new AMQMessage(messageHandle, header, incomingMessage.getSize() ,messagePublishInfo);
+
+ message.setExpiration(incomingMessage.getExpiration());
+ message.setClientIdentifier(_session);
+ return message;
+ }
+
+ private boolean checkMessageUserId(ContentHeaderBody header)
+ throws UnauthorizedAccessException
+ {
+ AMQShortString userID =
+ header.properties instanceof BasicContentHeaderProperties
+ ? ((BasicContentHeaderProperties) header.properties).getUserId()
+ : null;
+
+ return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString()));
+
+ }
+
+ private class MessageDeliveryAction implements Transaction.Action
+ {
+ private IncomingMessage _incommingMessage;
+ private ArrayList<AMQQueue> _destinationQueues;
+
+ public MessageDeliveryAction(IncomingMessage currentMessage,
+ ArrayList<AMQQueue> destinationQueues)
+ {
+ _incommingMessage = currentMessage;
+ _destinationQueues = destinationQueues;
+ }
+
+ public void postCommit()
+ {
+ try
+ {
+ final boolean immediate = _incommingMessage.isImmediate();
+
+ Transaction txn = null;
+
+ for(AMQQueue queue : _destinationQueues)
+ {
+ QueueEntry entry = queue.enqueue(createAMQMessage(_incommingMessage));
+ queue.checkCapacity(AMQChannel.this);
+
+
+ if(immediate && !entry.getDeliveredToConsumer() && entry.acquire())
+ {
+ if(txn == null)
+ {
+ txn = new LocalTransaction(_messageStore);
+ Collection<QueueEntry> entries = new ArrayList<QueueEntry>(1);
+ entries.add(entry);
+ txn.enqueue(queue, entry.getMessage(), new MessageAcknowledgeAction(entries));
+ }
+
+
+ AMQMessage message = (AMQMessage) entry.getMessage();
+ _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ message.getContentHeaderBody(),
+ message.getBodyFrameIterator(_session,_channelId),
+ _channelId,
+ AMQConstant.NO_CONSUMERS.getCode(),
+ new AMQShortString("Immediate delivery is not possible."));
+
+
+ }
+
+ }
+ if(txn != null)
+ {
+ txn.commit();
+ }
+ }
+ catch (AMQException e)
+ {
+ // TODO
+ throw new RuntimeException(e);
+ }
+
+
+
+
+
+ }
+
+ public void onRollback()
+ {
+ // Maybe keep track of entries that were created and then delete them here in case of failure
+ // to in memory enqueue
+ }
+ }
+
+ private class MessageAcknowledgeAction implements Transaction.Action
+ {
+ private final Collection<QueueEntry> _ackedMessages;
+
+
+ public MessageAcknowledgeAction(Collection<QueueEntry> ackedMessages)
+ {
+ _ackedMessages = ackedMessages;
+ }
+
+ public void postCommit()
+ {
+ try
+ {
+ for(QueueEntry entry : _ackedMessages)
+ {
+ entry.discard();
+ }
+ }
+ finally
+ {
+ _acknowledgedMessages.clear();
+ }
+
+ }
+
+ public void onRollback()
+ {
+ // explicit rollbacks resend the message after the rollback-ok is sent
+ if(_rollingBack)
+ {
+ _resendList.addAll(_ackedMessages);
+ }
+ else
+ {
+ try
+ {
+ for(QueueEntry entry : _ackedMessages)
+ {
+ entry.release();
+ }
+ }
+ finally
+ {
+ _acknowledgedMessages.clear();
+ }
+ }
+
+ }
+ }
+
+ private class WriteReturnAction implements Transaction.Action
+ {
+ private final AMQConstant _errorCode;
+ private final AMQMessage _message;
+ private final String _description;
+
+ public WriteReturnAction(AMQConstant errorCode,
+ String description,
+ AMQMessage message)
+ {
+ _errorCode = errorCode;
+ _message = message;
+ _description = description;
+ }
+
+ public void postCommit()
+ {
+ try
+ {
+ _session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(),
+ _message.getContentHeaderBody(),
+ _message.getBodyFrameIterator(_session,_channelId),
+ _channelId,
+ _errorCode.getCode(),
+ new AMQShortString(_description));
+ }
+ catch (AMQException e)
+ {
+ //TODO
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ }
+
+
public LogActor getLogActor()
{
return _actor;
@@ -1029,7 +1210,7 @@
}
}
- public void unblock(AMQQueue queue)
+ public void unblock(AMQQueue queue)
{
if(_blockingQueues.remove(queue))
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Sun Oct 11 15:10:43 2009
@@ -21,12 +21,11 @@
package org.apache.qpid.server;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
-import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -36,23 +35,23 @@
{
private static final Logger _log = Logger.getLogger(ExtractResendAndRequeue.class);
- private Map<Long, QueueEntry> _msgToRequeue;
- private Map<Long, QueueEntry> _msgToResend;
- private boolean _requeueIfUnabletoResend;
- private StoreContext _storeContext;
- private UnacknowledgedMessageMap _unacknowledgedMessageMap;
+ private final Map<Long, QueueEntry> _msgToRequeue;
+ private final Map<Long, QueueEntry> _msgToResend;
+ private final boolean _requeueIfUnabletoResend;
+ private final UnacknowledgedMessageMap _unacknowledgedMessageMap;
+ private final TransactionLog _transactionLog;
- public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
+ public ExtractResendAndRequeue(UnacknowledgedMessageMap unacknowledgedMessageMap,
Map<Long, QueueEntry> msgToRequeue,
Map<Long, QueueEntry> msgToResend,
boolean requeueIfUnabletoResend,
- StoreContext storeContext)
+ TransactionLog txnLog)
{
_unacknowledgedMessageMap = unacknowledgedMessageMap;
_msgToRequeue = msgToRequeue;
_msgToResend = msgToResend;
_requeueIfUnabletoResend = requeueIfUnabletoResend;
- _storeContext = storeContext;
+ _transactionLog = txnLog;
}
public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
@@ -85,13 +84,14 @@
}
else
{
- message.discard(_storeContext);
+
+ dequeueEntry(message);
_log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
}
else
{
- message.discard(_storeContext);
+ dequeueEntry(message);
_log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
}
@@ -100,6 +100,31 @@
return false;
}
+
+ private void dequeueEntry(final QueueEntry node)
+ {
+ Transaction txn = new AutoCommitTransaction(_transactionLog);
+ dequeueEntry(node, txn);
+ }
+
+ private void dequeueEntry(final QueueEntry node, Transaction txn)
+ {
+ txn.dequeue(node.getQueue(), node.getMessage(),
+ new Transaction.Action()
+ {
+
+ public void postCommit()
+ {
+ node.discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+
public void visitComplete()
{
_unacknowledgedMessageMap.clear();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Sun Oct 11 15:10:43 2009
@@ -335,8 +335,7 @@
String keystorePassword = serverConfig.getKeystorePassword();
String certType = serverConfig.getCertType();
SSLContextFactory sslFactory = null;
- boolean isSsl = false;
-
+
if (!serverConfig.getSSLOnly())
{
NetworkDriver driver = new MINANetworkDriver();
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Sun Oct 11 15:10:43 2009
@@ -82,6 +82,6 @@
public void release()
{
- _amqMessage.release();
+ //_amqMessage.release();
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Sun Oct 11 15:10:43 2009
@@ -21,14 +21,12 @@
package org.apache.qpid.server.ack;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
import java.util.Map;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.StoreContext;
+
public interface UnacknowledgedMessageMap
{
@@ -50,18 +48,12 @@
void collect(long deliveryTag, boolean multiple, Map<Long, QueueEntry> msgs);
- boolean contains(long deliveryTag) throws AMQException;
-
void remove(Map<Long,QueueEntry> msgs);
QueueEntry remove(long deliveryTag);
- public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
-
Collection<QueueEntry> cancelAllMessages();
- void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext) throws AMQException;
-
int size();
void clear();
@@ -75,7 +67,6 @@
*/
Set<Long> getDeliveryTags();
- public long getUnacknowledgeBytes();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Sun Oct 11 15:10:43 2009
@@ -20,20 +20,13 @@
*/
package org.apache.qpid.server.ack;
-import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.txn.TransactionalContext;
public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
{
@@ -61,19 +54,15 @@
}
else
{
- msgs.put(deliveryTag, get(deliveryTag));
+ final QueueEntry entry = get(deliveryTag);
+ if(entry != null)
+ {
+ msgs.put(deliveryTag, entry);
+ }
}
}
- public boolean contains(long deliveryTag) throws AMQException
- {
- synchronized (_lock)
- {
- return _map.containsKey(deliveryTag);
- }
- }
-
public void remove(Map<Long,QueueEntry> msgs)
{
synchronized (_lock)
@@ -135,15 +124,6 @@
}
}
- public void acknowledgeMessage(long deliveryTag, boolean multiple, TransactionalContext txnContext)
- throws AMQException
- {
- synchronized (_lock)
- {
- txnContext.acknowledgeMessage(deliveryTag, _lastDeliveryTag, multiple, this);
- }
- }
-
public int size()
{
synchronized (_lock)
@@ -161,39 +141,6 @@
}
}
- public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
-
- {
- synchronized (_lock)
- {
- Iterator<Map.Entry<Long, QueueEntry>> it = _map.entrySet().iterator();
- while (it.hasNext())
- {
- Map.Entry<Long, QueueEntry> unacked = it.next();
-
- if (unacked.getKey() > deliveryTag)
- {
- //This should not occur now.
- throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() +
- " When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- unacked.getValue().discard(storeContext);
-
- it.remove();
-
- _unackedSize -= unacked.getValue().getMessage().getSize();
-
-
- if (unacked.getKey() == deliveryTag)
- {
- break;
- }
- }
- }
- }
-
public QueueEntry get(long key)
{
synchronized (_lock)
@@ -225,8 +172,4 @@
}
}
- public long getUnacknowledgeBytes()
- {
- return _unackedSize;
- }
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Sun Oct 11 15:10:43 2009
@@ -25,7 +25,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.protocol.ExchangeInitialiser;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -61,9 +61,9 @@
return getExchange(new AMQShortString(exchangeName));
}
- public MessageStore getMessageStore()
+ public DurableConfigurationStore getDurableConfigurationStore()
{
- return _host.getMessageStore();
+ return _host.getDurableConfigurationStore();
}
public void registerExchange(Exchange exchange) throws AMQException
@@ -71,7 +71,7 @@
_exchangeMap.put(exchange.getName(), exchange);
if (exchange.isDurable())
{
- getMessageStore().createExchange(exchange);
+ getDurableConfigurationStore().createExchange(exchange);
}
}
@@ -98,7 +98,7 @@
{
if (e.isDurable())
{
- getMessageStore().removeExchange(e);
+ getDurableConfigurationStore().removeExchange(e);
}
e.close();
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Sun Oct 11 15:10:43 2009
@@ -209,7 +209,7 @@
}
}
- public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload)
{
final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey());
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Sun Oct 11 15:10:43 2009
@@ -56,7 +56,7 @@
void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException;
- ArrayList<AMQQueue> route(InboundMessage message) throws AMQException;
+ ArrayList<AMQQueue> route(InboundMessage message);
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sun Oct 11 15:10:43 2009
@@ -199,7 +199,7 @@
}
}
- public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload)
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Sun Oct 11 15:10:43 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -88,7 +88,7 @@
* Creates a binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
* no constraint on the value. Those with a non-null value are assumed to
- * define a required match of value.
+ * define a required match of value.
* @param mappings the defined mappings this binding should use
*/
@@ -174,14 +174,21 @@
private boolean or(final AMQMessageHeader headers)
{
- if(required.isEmpty() || passesRequiredOr(headers))
+ if(required.isEmpty())
{
- return ((!matches.isEmpty()) && passesMatchesOr(headers))
- || (required.isEmpty() && matches.isEmpty());
+ return matches.isEmpty() || passesMatchesOr(headers);
}
else
{
- return true;
+ if(!passesRequiredOr(headers))
+ {
+ return !matches.isEmpty() && passesMatchesOr(headers);
+ }
+ else
+ {
+ return true;
+ }
+
}
}
@@ -190,7 +197,7 @@
for(Map.Entry<String,Object> entry : matches.entrySet())
{
if(headers.containsHeader(entry.getKey())
- || ((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
+ && ((entry.getValue() == null && headers.getHeader(entry.getKey()) == null)
|| (entry.getValue().equals(headers.getHeader(entry.getKey())))))
{
return true;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sun Oct 11 15:10:43 2009
@@ -253,7 +253,7 @@
}
}
- public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload)
{
AMQMessageHeader header = payload.getMessageHeader();
if (_logger.isDebugEnabled())
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Sun Oct 11 15:10:43 2009
@@ -535,7 +535,7 @@
return normalizedString;
}
- public ArrayList<AMQQueue> route(InboundMessage payload) throws AMQException
+ public ArrayList<AMQQueue> route(InboundMessage payload)
{
final AMQShortString routingKey = payload.getRoutingKey() == null
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Sun Oct 11 15:10:43 2009
@@ -133,7 +133,7 @@
singleMessageCredit.useCreditForMessage(entry.getMessage());
if(entry.getMessage() instanceof AMQMessage)
{
- session.getProtocolOutputConverter().writeGetOk((AMQMessage)(entry.getMessage()), channel.getChannelId(),
+ session.getProtocolOutputConverter().writeGetOk(entry, channel.getChannelId(),
deliveryTag, queue.getMessageCount());
}
else
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Sun Oct 11 15:10:43 2009
@@ -81,7 +81,7 @@
message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
if(message != null)
{
- message.discard(channel.getStoreContext());
+ message.discard();
}
//sendtoDeadLetterQueue(msg)
return;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sun Oct 11 15:10:43 2009
@@ -41,6 +41,7 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -60,11 +61,11 @@
public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
if (!body.getPassive())
@@ -115,6 +116,25 @@
store.createQueue(queue, body.getArguments());
}
queueRegistry.registerQueue(queue);
+ if(queue.isExclusive() && !queue.isAutoDelete())
+ {
+ final AMQQueue q = queue;
+ queue.setExclusiveOwner(session);
+ final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ q.setExclusiveOwner(null);
+ }
+ };
+ session.addSessionCloseTask(sessionCloseTask);
+ queue.addQueueDeleteTask(new AMQQueue.Task() {
+ public void doTask(AMQQueue queue) throws AMQException
+ {
+ session.removeSessionCloseTask(sessionCloseTask);
+ }
+ });
+ }
if (autoRegister)
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
@@ -125,16 +145,17 @@
}
}
else if (queue.getPrincipalHolder() != null
- && queue.getPrincipalHolder().getPrincipal() != null
- && queue.getPrincipalHolder().getPrincipal().getName() != null
- && !session.getContextKey().equals(new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName())))
+ && queue.getPrincipalHolder().getPrincipal() != null
+ && queue.getPrincipalHolder().getPrincipal().getName() != null
+ && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName())
+ || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session))))
{
throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another client ID('"
+ queue.getPrincipalHolder().getPrincipal().getName() + "')");
- }
+ }
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
@@ -201,7 +222,7 @@
}
});
}// if exclusive and not durable
-
+
return queue;
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Sun Oct 11 15:10:43 2009
@@ -31,6 +31,7 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.security.access.Permission;
@@ -62,7 +63,7 @@
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- MessageStore store = virtualHost.getMessageStore();
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
AMQQueue queue;
if (body.getQueue() == null)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Sun Oct 11 15:10:43 2009
@@ -33,7 +33,6 @@
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
{
@@ -106,7 +105,7 @@
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
}
- long purged = queue.clearQueue(channel.getStoreContext());
+ long purged = queue.clearQueue();
if(!body.getNowait())
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Sun Oct 11 15:10:43 2009
@@ -26,7 +26,6 @@
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -62,22 +61,16 @@
{
throw body.getChannelNotFoundException(channelId);
}
- StoreContext.setCurrentContext(channel.getStoreContext());
channel.commit();
MethodRegistry methodRegistry = session.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
session.writeFrame(responseBody.generateFrame(channelId));
-
- channel.processReturns();
+
}
catch (AMQException e)
{
throw body.getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
}
- finally
- {
- StoreContext.clearCurrentContext();
- }
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=824084&r1=824083&r2=824084&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Sun Oct 11 15:10:43 2009
@@ -44,9 +44,9 @@
{
}
- public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, int channelId) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
try
{
@@ -57,17 +57,22 @@
throw body.getChannelNotFoundException(channelId);
}
- channel.rollback();
- MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ final MethodRegistry methodRegistry = session.getMethodRegistry();
+ final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+ Runnable task = new Runnable()
+ {
+
+ public void run()
+ {
+ session.writeFrame(responseBody.generateFrame(channelId));
+ }
+ };
+
+ channel.rollback(task);
- //Now resend all the unacknowledged messages back to the original subscribers.
- //(Must be done after the TxnRollback-ok response).
- // Why, are we not allowed to send messages back to client before the ok method?
- channel.resend(false);
}
catch (AMQException e)
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org