You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC
svn commit: r829675 [5/11] - in /qpid/trunk/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/ broker/bin/ broker/src/main/java/org/apac...
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,7 +28,7 @@
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.security.PrincipalHolder;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -37,12 +37,18 @@
import java.util.List;
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
{
long getSessionID();
LogActor getLogActor();
+ void setMaxFrameSize(long frameMax);
+
+ long getMaxFrameSize();
+
+ boolean isClosing();
+
public static final class ProtocolSessionIdentifier
{
private final Object _sessionIdentifier;
@@ -201,9 +207,6 @@
void setAuthorizedID(Principal authorizedID);
- /** @return a Principal that was used to authorized this session */
- Principal getAuthorizedID();
-
public java.net.SocketAddress getRemoteAddress();
public MethodRegistry getMethodRegistry();
@@ -224,8 +227,10 @@
void commitTransactions(AMQChannel channel) throws AMQException;
+ void rollbackTransactions(AMQChannel channel) throws AMQException;
+
List<AMQChannel> getChannels();
void closeIfLingeringClosedChannels();
-
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Sun Oct 25 22:58:57 2009
@@ -133,7 +133,7 @@
public String getAuthorizedId()
{
- return (_protocolSession.getAuthorizedID() != null ) ? _protocolSession.getAuthorizedID().getName() : null;
+ return (_protocolSession.getPrincipal() != null ) ? _protocolSession.getPrincipal().getName() : null;
}
public String getVersion()
@@ -227,7 +227,7 @@
throw new JMException("The channel (channel Id = " + channelId + ") does not exist");
}
- _protocolSession.commitTransactions(channel);
+ _protocolSession.rollbackTransactions(channel);
}
catch (AMQException ex)
{
Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 25 22:58:57 2009
@@ -1,3 +1,5 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:757257
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:805429-821809
/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:787599
/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Sun Oct 25 22:58:57 2009
@@ -24,7 +24,6 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.AMQException;
public class AMQPriorityQueue extends SimpleAMQQueue
{
@@ -34,11 +33,19 @@
final boolean autoDelete,
final VirtualHost virtualHost,
int priorities)
- throws AMQException
{
super(name, durable, owner, autoDelete, virtualHost, new PriorityQueueList.Factory(priorities));
}
+ public AMQPriorityQueue(String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, int priorities)
+ {
+ this(new AMQShortString(queueName), durable, new AMQShortString(owner),autoDelete,virtualHost,priorities);
+ }
+
public int getPriorities()
{
return ((PriorityQueueList) _entries).getPriorities();
@@ -52,16 +59,25 @@
while(subIter.advance() && !entry.isAcquired())
{
final Subscription subscription = subIter.getNode().getSubscription();
- QueueEntry subnode = subscription.getLastSeenEntry();
- while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired())
+ if(!subscription.isClosed())
{
- if(subscription.setLastSeenEntry(subnode,entry))
- {
- break;
- }
- else
+ QueueContext context = (QueueContext) subscription.getQueueContext();
+ if(context != null)
{
- subnode = subscription.getLastSeenEntry();
+ QueueEntry subnode = context._lastSeenEntry;
+ QueueEntry released = context._releasedEntry;
+ while(subnode != null && entry.compareTo(subnode) < 0 && !entry.isAcquired() && (released == null || released.compareTo(entry) < 0))
+ {
+ if(QueueContext._releasedUpdater.compareAndSet(context,released,entry))
+ {
+ break;
+ }
+ else
+ {
+ subnode = context._lastSeenEntry;
+ released = context._releasedEntry;
+ }
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Sun Oct 25 22:58:57 2009
@@ -20,34 +20,49 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.security.PrincipalHolder;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
import java.util.List;
import java.util.Set;
+import java.util.Map;
-public interface AMQQueue extends Managable, Comparable<AMQQueue>
+public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource
{
+
+ public interface Context
+ {
+ QueueEntry getLastSeenEntry();
+ }
+
AMQShortString getName();
+ void setNoLocal(boolean b);
+
boolean isDurable();
boolean isAutoDelete();
AMQShortString getOwner();
+ PrincipalHolder getPrincipalHolder();
+ void setPrincipalHolder(PrincipalHolder principalHolder);
+
+ void setExclusiveOwner(Object owner);
+ Object getExclusiveOwner();
VirtualHost getVirtualHost();
@@ -89,17 +104,19 @@
int delete() throws AMQException;
- QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException;
+ QueueEntry enqueue(ServerMessage message) throws AMQException;
+
+ void requeue(QueueEntry entry);
- void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException;
+ void requeue(QueueEntryImpl storeContext, Subscription subscription);
- void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException;
+ void dequeue(QueueEntry entry);
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
-
+
void addQueueDeleteTask(final Task task);
@@ -113,11 +130,11 @@
List<Long> getMessagesOnTheQueue(int num, int offest);
QueueEntry getMessageOnTheQueue(long messageId);
-
+
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
- *
- * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
+ *
+ * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1.
* Using 0 in the 'to' field will return an empty list regardless of the 'from' value.
* @param fromPosition
* @param toPosition
@@ -127,11 +144,11 @@
void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
- StoreContext storeContext);
+ ServerTransaction transaction);
- void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, StoreContext storeContext);
+ void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction);
- void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext);
+ void removeMessagesFromQueue(long fromMessageId, long toMessageId);
@@ -171,9 +188,9 @@
- void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
+ void deleteMessageFromTop();
- long clearQueue(StoreContext storeContext) throws AMQException;
+ long clearQueue();
/**
* Checks the status of messages on the queue, purging expired ones, firing age related alerts etc.
@@ -191,6 +208,14 @@
void stop();
+ boolean isExclusive();
+
+ Exchange getAlternateExchange();
+
+ void setAlternateExchange(Exchange exchange);
+
+ Map<String, Object> getArguments();
+
void checkCapacity(AMQChannel channel);
/**
@@ -242,6 +267,6 @@
}
void configure(QueueConfiguration config);
-
+
ManagedObject getManagedObject();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Sun Oct 25 22:58:57 2009
@@ -27,7 +27,6 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
-import java.util.HashMap;
public class AMQQueueFactory
@@ -130,7 +129,6 @@
AMQShortString owner,
boolean autoDelete,
VirtualHost virtualHost, final FieldTable arguments)
- throws AMQException
{
final int priorities = arguments == null ? 1 : arguments.containsKey(X_QPID_PRIORITIES) ? arguments.getInteger(X_QPID_PRIORITIES) : 1;
@@ -189,4 +187,39 @@
q.configure(config);
return q;
}
+
+ public static AMQQueue createAMQQueueImpl(String queueName,
+ boolean durable,
+ String owner,
+ boolean autoDelete,
+ VirtualHost virtualHost, Map<String, Object> arguments)
+ throws AMQException
+ {
+ int priorities = 1;
+ if(arguments != null && arguments.containsKey(X_QPID_PRIORITIES))
+ {
+ Object prioritiesObj = arguments.get(X_QPID_PRIORITIES);
+ if(prioritiesObj instanceof Number)
+ {
+ priorities = ((Number)prioritiesObj).intValue();
+ }
+ }
+
+
+ AMQQueue q = null;
+ if(priorities > 1)
+ {
+ q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, virtualHost, priorities);
+ }
+ else
+ {
+ q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, virtualHost);
+ }
+
+ //Register the new queue
+ virtualHost.getQueueRegistry().registerQueue(q);
+ q.configure(virtualHost.getConfiguration().getQueueConfiguration(queueName));
+ return q;
+
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Sun Oct 25 22:58:57 2009
@@ -22,23 +22,21 @@
import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.management.common.mbeans.ManagedQueue;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
import javax.management.JMException;
-import javax.management.MBeanException;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.OperationsException;
@@ -72,12 +70,6 @@
private static final SimpleDateFormat _dateFormat = new SimpleDateFormat("MM-dd-yy HH:mm:ss.SSS z");
- /**
- * Since the MBean is not associated with a real channel we can safely create our own store context
- * for use in the few methods that require one.
- */
- private StoreContext _storeContext = new StoreContext();
-
private AMQQueue _queue = null;
private String _queueName = null;
// OpenMBean data types for viewMessages method
@@ -131,7 +123,7 @@
_msgContentAttributeTypes[1] = SimpleType.STRING; // For MimeType
_msgContentAttributeTypes[2] = SimpleType.STRING; // For Encoding
_msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE); // For message content
- _msgContentType = new CompositeType("Message Content", "AMQ Message Content",
+ _msgContentType = new CompositeType("Message Content", "AMQ Message Content",
VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, VIEW_MSG_CONTENT_COMPOSITE_ITEM_DESCRIPTIONS,
_msgContentAttributeTypes);
@@ -141,9 +133,9 @@
_msgAttributeTypes[3] = SimpleType.BOOLEAN; // For redelivered
_msgAttributeTypes[4] = SimpleType.LONG; // For queue position
- _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES,
+ _messageDataType = new CompositeType("Message", "AMQ Message", VIEW_MSGS_COMPOSITE_ITEM_NAMES,
VIEW_MSGS_COMPOSITE_ITEM_DESCRIPTIONS, _msgAttributeTypes);
- _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
+ _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType,
VIEW_MSGS_TABULAR_UNIQUE_INDEX);
}
@@ -164,7 +156,11 @@
public String getOwner()
{
- return String.valueOf(_queue.getOwner());
+ return String.valueOf(_queue.getPrincipalHolder() == null
+ ? null
+ : _queue.getPrincipalHolder().getPrincipal() == null
+ ? null
+ : _queue.getPrincipalHolder().getPrincipal().getName());
}
public boolean isAutoDelete()
@@ -246,7 +242,7 @@
/**
* Checks if there is any notification to be send to the listeners
*/
- public void checkForNotification(AMQMessage msg) throws AMQException
+ public void checkForNotification(ServerMessage msg) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -296,32 +292,18 @@
*/
public void deleteMessageFromTop() throws JMException
{
- try
- {
- _queue.deleteMessageFromTop(_storeContext);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
+ _queue.deleteMessageFromTop();
}
/**
* Clears the queue of non-acquired messages
- *
+ *
* @return the number of messages deleted
* @see AMQQueue#clearQueue
*/
public Long clearQueue() throws JMException
{
- try
- {
- return _queue.clearQueue(_storeContext);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
+ return _queue.clearQueue();
}
/**
@@ -336,49 +318,41 @@
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
- AMQMessage msg = entry.getMessage();
- // get message content
- Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
+ ServerMessage serverMsg = entry.getMessage();
+ final int bodySize = (int) serverMsg.getSize();
+
+
List<Byte> msgContent = new ArrayList<Byte>();
- while (cBodies.hasNext())
- {
- ContentChunk body = cBodies.next();
- if (body.getSize() != 0)
- {
- if (body.getSize() != 0)
- {
- ByteBuffer slice = body.getData().slice();
- for (int j = 0; j < slice.limit(); j++)
- {
- msgContent.add(slice.get());
- }
- }
- }
- }
- try
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(bodySize);
+ int position = 0;
+
+ while(position < bodySize)
{
- // Create header attributes list
- CommonContentHeaderProperties headerProperties =
- (CommonContentHeaderProperties) msg.getContentHeaderBody().properties;
- String mimeType = null, encoding = null;
- if (headerProperties != null)
+ position += serverMsg.getContent(buf, position);
+ buf.flip();
+ for(int i = 0; i < buf.limit(); i++)
{
- AMQShortString mimeTypeShortSting = headerProperties.getContentType();
- mimeType = (mimeTypeShortSting == null) ? null : mimeTypeShortSting.toString();
- encoding = (headerProperties.getEncoding() == null) ? "" : headerProperties.getEncoding().toString();
+ msgContent.add(buf.get(i));
}
+ buf.clear();
+ }
- Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+ AMQMessageHeader header = serverMsg.getMessageHeader();
- return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
- }
- catch (AMQException e)
+ String mimeType = null, encoding = null;
+ if (header != null)
{
- JMException jme = new JMException("Error creating header attributes list: " + e);
- jme.initCause(e);
- throw jme;
+ mimeType = header.getMimeType();
+
+ encoding = header.getEncoding();
}
+
+
+ Object[] itemValues = { msgId, mimeType, encoding, msgContent.toArray(new Byte[0]) };
+
+ return new CompositeDataSupport(_msgContentType, VIEW_MSG_CONTENT_COMPOSITE_ITEM_NAMES, itemValues);
+
}
/**
@@ -390,8 +364,8 @@
{
return viewMessages((long)beginIndex,(long)endIndex);
}
-
-
+
+
/**
* Returns the header contents of the messages stored in this queue in tabular form.
* @param startPosition The queue position of the first message to be viewed
@@ -404,7 +378,7 @@
throw new OperationsException("From Index = " + startPosition + ", To Index = " + endPosition
+ "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
}
-
+
if ((endPosition - startPosition) > Integer.MAX_VALUE)
{
throw new OperationsException("Specified MessageID interval is too large. Intervals must be less than 2^31 in size");
@@ -421,13 +395,22 @@
for (int i = 0; i < size ; i++)
{
long position = startPosition + i;
- AMQMessage msg = list.get(i).getMessage();
- ContentHeaderBody headerBody = msg.getContentHeaderBody();
- // Create header attributes list
- String[] headerAttributes = getMessageHeaderProperties(headerBody);
- Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered(), position};
- CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
- _messageList.put(messageData);
+ final QueueEntry queueEntry = list.get(i);
+ ServerMessage serverMsg = queueEntry.getMessage();
+ if(serverMsg instanceof AMQMessage)
+ {
+ AMQMessage msg = (AMQMessage) serverMsg;
+ ContentHeaderBody headerBody = msg.getContentHeaderBody();
+ // Create header attributes list
+ String[] headerAttributes = getMessageHeaderProperties(headerBody);
+ Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, queueEntry.isRedelivered(), position};
+ CompositeData messageData = new CompositeDataSupport(_messageDataType, VIEW_MSGS_COMPOSITE_ITEM_NAMES, itemValues);
+ _messageList.put(messageData);
+ }
+ else
+ {
+ // TODO 0-10 Message
+ }
}
}
catch (AMQException e)
@@ -484,7 +467,9 @@
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+ _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+ txn.commit();
}
/**
@@ -500,9 +485,9 @@
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- _queue.removeMessagesFromQueue(fromMessageId, toMessageId, _storeContext);
+ _queue.removeMessagesFromQueue(fromMessageId, toMessageId);
}
-
+
/**
* @see ManagedQueue#copyMessages
* @param fromMessageId
@@ -517,9 +502,15 @@
throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
}
- _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, _storeContext);
+ ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getTransactionLog());
+
+ _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
+
+ txn.commit();
+
+
}
-
+
/**
* returns Notifications sent by this MBean.
*/
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -44,12 +44,12 @@
return _virtualHost;
}
- public void registerQueue(AMQQueue queue) throws AMQException
+ public void registerQueue(AMQQueue queue)
{
_queueMap.put(queue.getName(), queue);
}
- public void unregisterQueue(AMQShortString name) throws AMQException
+ public void unregisterQueue(AMQShortString name)
{
_queueMap.remove(name);
}
@@ -68,4 +68,9 @@
{
return _queueMap.values();
}
+
+ public AMQQueue getQueue(String queue)
+ {
+ return getQueue(new AMQShortString(queue));
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Filterable.java Sun Oct 25 22:58:57 2009
@@ -22,12 +22,13 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.AMQMessageHeader;
-public interface Filterable<E extends Exception>
+public interface Filterable
{
- ContentHeaderBody getContentHeaderBody() throws E;
+ AMQMessageHeader getMessageHeader();
- boolean isPersistent() throws E;
+ boolean isPersistent();
boolean isRedelivered();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Sun Oct 25 22:58:57 2009
@@ -25,19 +25,22 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.exchange.NoRouteException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.List;
+import java.nio.ByteBuffer;
-public class IncomingMessage implements Filterable<RuntimeException>
+public class IncomingMessage implements Filterable, InboundMessage, EnqueableMessage, MessageContentSource
{
/** Used for debugging purposes. */
@@ -48,12 +51,6 @@
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
- private AMQMessageHandle _messageHandle;
- private final Long _messageId;
- private final TransactionalContext _txnContext;
-
- private static final boolean MSG_AUTH =
- ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
/**
@@ -68,23 +65,27 @@
*/
private ArrayList<AMQQueue> _destinationQueues;
- private AMQProtocolSession _publisher;
- private MessageStore _messageStore;
private long _expiration;
-
+
private Exchange _exchange;
- public IncomingMessage(final Long messageId,
- final MessagePublishInfo info,
- final TransactionalContext txnContext,
- final AMQProtocolSession publisher)
+ private int _receivedChunkCount = 0;
+ private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
+
+ // we keep both the original meta data object and the store reference to it just in case the
+ // store would otherwise flow it to disk
+
+ private MessageMetaData _messageMetaData;
+
+ private StoredMessage<MessageMetaData> _storedMessageHandle;
+
+
+ public IncomingMessage(
+ final MessagePublishInfo info
+ )
{
- _messageId = messageId;
_messagePublishInfo = info;
- _txnContext = txnContext;
- _publisher = publisher;
-
}
public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -123,183 +124,94 @@
}
- public void routingComplete(final MessageStore store,
- final MessageHandleFactory factory) throws AMQException
+ public MessageMetaData headersReceived()
{
-
- final boolean persistent = isPersistent();
- _messageHandle = factory.createMessageHandle(_messageId, store, persistent);
- if (persistent)
- {
- _txnContext.beginTranIfNecessary();
- // enqueuing the messages ensure that if required the destinations are recorded to a
- // persistent store
-
- if(_destinationQueues != null)
- {
- for (int i = 0; i < _destinationQueues.size(); i++)
- {
- store.enqueueMessage(_txnContext.getStoreContext(),
- _destinationQueues.get(i), _messageId);
- }
- }
- }
+ _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0);
+ return _messageMetaData;
}
- public AMQMessage deliverToQueues()
- throws AMQException
- {
-
- // we get a reference to the destination queues now so that we can clear the
- // transient message data as quickly as possible
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Delivering message " + _messageId + " to " + _destinationQueues);
- }
-
- AMQMessage message = null;
-
- try
- {
- // first we allow the handle to know that the message has been fully received. This is useful if it is
- // maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(_txnContext.getStoreContext(),
- _messagePublishInfo, getContentHeaderBody());
-
-
-
- message = new AMQMessage(_messageHandle,_txnContext.getStoreContext(), _messagePublishInfo);
-
- message.setExpiration(_expiration);
- message.setClientIdentifier(_publisher.getSessionIdentifier());
-
- // we then allow the transactional context to do something with the message content
- // now that it has all been received, before we attempt delivery
- _txnContext.messageFullyReceived(isPersistent());
-
- AMQShortString userID = getContentHeaderBody().properties instanceof BasicContentHeaderProperties ?
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getUserId() : null;
-
- if (MSG_AUTH && !_publisher.getAuthorizedID().getName().equals(userID == null? "" : userID.toString()))
- {
- throw new UnauthorizedAccessException("Acccess Refused",message);
- }
-
- if ((_destinationQueues == null) || _destinationQueues.size() == 0)
- {
-
- if (isMandatory() || isImmediate())
- {
- throw new NoRouteException("No Route for message", message);
-
- }
- else
- {
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + message);
- }
- }
- else
- {
- int offset;
- final int queueCount = _destinationQueues.size();
- message.incrementReference(queueCount);
- if(queueCount == 1)
- {
- offset = 0;
- }
- else
- {
- offset = ((int)(message.getMessageId().longValue())) % queueCount;
- if(offset < 0)
- {
- offset = -offset;
- }
- }
- for (int i = offset; i < queueCount; i++)
- {
- // normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
- }
- for (int i = 0; i < offset; i++)
- {
- // normal deliver so add this message at the end.
- _txnContext.deliver(_destinationQueues.get(i), message);
- }
- }
-
- message.clearStoreContext();
- return message;
- }
- finally
- {
- // Remove refence for routing process . Reference count should now == delivered queue count
- if(message != null) message.decrementReference(_txnContext.getStoreContext());
- }
+ public ArrayList<AMQQueue> getDestinationQueues()
+ {
+ return _destinationQueues;
}
- public void addContentBodyFrame(final ContentChunk contentChunk)
+ public int addContentBodyFrame(final ContentChunk contentChunk)
throws AMQException
{
-
+ _storedMessageHandle.addContent((int)_bodyLengthReceived, contentChunk.getData().buf());
_bodyLengthReceived += contentChunk.getSize();
+ _contentChunks.add(contentChunk);
+
- _messageHandle.addContentBodyFrame(_txnContext.getStoreContext(), contentChunk, allContentReceived());
+ return _receivedChunkCount++;
}
public boolean allContentReceived()
{
- return (_bodyLengthReceived == getContentHeaderBody().bodySize);
+ return (_bodyLengthReceived == getContentHeader().bodySize);
}
- public AMQShortString getExchange() throws AMQException
+ public AMQShortString getExchange()
{
return _messagePublishInfo.getExchange();
}
- public AMQShortString getRoutingKey() throws AMQException
+ public String getRoutingKey()
{
- return _messagePublishInfo.getRoutingKey();
+ return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
}
- public boolean isMandatory() throws AMQException
+ public String getBinding()
+ {
+ return _messagePublishInfo.getRoutingKey() == null ? null : _messagePublishInfo.getRoutingKey().toString();
+ }
+
+
+ public boolean isMandatory()
{
return _messagePublishInfo.isMandatory();
}
- public boolean isImmediate() throws AMQException
+ public boolean isImmediate()
{
return _messagePublishInfo.isImmediate();
}
- public ContentHeaderBody getContentHeaderBody()
+ public ContentHeaderBody getContentHeader()
{
return _contentHeaderBody;
}
+ public AMQMessageHeader getMessageHeader()
+ {
+ return _messageMetaData.getMessageHeader();
+ }
+
public boolean isPersistent()
{
- return getContentHeaderBody().properties instanceof BasicContentHeaderProperties &&
- ((BasicContentHeaderProperties) getContentHeaderBody().properties).getDeliveryMode() ==
+ return getContentHeader().properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) getContentHeader().properties).getDeliveryMode() ==
BasicContentHeaderProperties.PERSISTENT;
}
-
+
public boolean isRedelivered()
{
return false;
}
- public void setMessageStore(final MessageStore messageStore)
+
+ public long getSize()
{
- _messageStore = messageStore;
+ return getContentHeader().bodySize;
}
- public Long getMessageId()
+ public Long getMessageNumber()
{
- return _messageId;
+ return _storedMessageHandle.getMessageNumber();
}
public void setExchange(final Exchange e)
@@ -307,13 +219,82 @@
_exchange = e;
}
- public void route() throws AMQException
+ public void route()
{
- _exchange.route(this);
+ enqueue(_exchange.route(this));
+
}
public void enqueue(final ArrayList<AMQQueue> queues)
{
_destinationQueues = queues;
}
+
+ public MessagePublishInfo getMessagePublishInfo()
+ {
+ return _messagePublishInfo;
+ }
+
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
+ public int getReceivedChunkCount()
+ {
+ return _receivedChunkCount;
+ }
+
+
+ public int getBodyCount() throws AMQException
+ {
+ return _contentChunks.size();
+ }
+
+ public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+ {
+ return _contentChunks.get(index);
+ }
+
+
+ public int getContent(ByteBuffer buf, int offset)
+ {
+ int pos = 0;
+ int written = 0;
+ for(ContentChunk cb : _contentChunks)
+ {
+ ByteBuffer data = cb.getData().buf();
+ if(offset+written >= pos && offset < pos + data.limit())
+ {
+ ByteBuffer src = data.duplicate();
+ src.position(offset+written - pos);
+ src = src.slice();
+
+ if(buf.remaining() < src.limit())
+ {
+ src.limit(buf.remaining());
+ }
+ int count = src.limit();
+ buf.put(src);
+ written += count;
+ if(buf.remaining() == 0)
+ {
+ break;
+ }
+ }
+ pos+=data.limit();
+ }
+ return written;
+
+ }
+
+ public void setStoredMessage(StoredMessage<MessageMetaData> storedMessageHandle)
+ {
+ _storedMessageHandle = storedMessageHandle;
+ }
+
+ public StoredMessage<MessageMetaData> getStoredMessage()
+ {
+ return _storedMessageHandle;
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java Sun Oct 25 22:58:57 2009
@@ -21,13 +21,14 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,26 +42,19 @@
},
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
long messageSize;
- try
- {
- messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
- }
- catch (AMQException e)
- {
- messageSize = 0;
- }
+ messageSize = (msg == null) ? 0 : msg.getSize();
if (messageSize >= maximumMessageSize)
{
- listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageId() + "]");
+ listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold ("+ maximumMessageSize +") breached. [Message ID=" + msg.getMessageNumber() + "]");
return true;
}
}
@@ -70,7 +64,7 @@
},
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -91,7 +85,7 @@
},
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -133,6 +127,6 @@
return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(ServerMessage msg, AMQQueue queue, QueueNotificationListener listener);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sun Oct 25 22:58:57 2009
@@ -22,6 +22,7 @@
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.message.ServerMessage;
public class PriorityQueueList implements QueueEntryList
{
@@ -52,26 +53,18 @@
return _queue;
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(ServerMessage message)
{
- try
+ int index = message.getMessageHeader().getPriority() - _priorityOffset;
+ if(index >= _priorities)
{
- int index = ((CommonContentHeaderProperties)((message.getContentHeaderBody().properties))).getPriority() - _priorityOffset;
- if(index >= _priorities)
- {
- index = _priorities-1;
- }
- else if(index < 0)
- {
- index = 0;
- }
- return _priorityLists[index].add(message);
+ index = _priorities-1;
}
- catch (AMQException e)
+ else if(index < 0)
{
- // TODO - fix AMQ Exception
- throw new RuntimeException(e);
+ index = 0;
}
+ return _priorityLists[index].add(message);
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Sun Oct 25 22:58:57 2009
@@ -1,8 +1,8 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
/*
*
@@ -24,18 +24,19 @@
* under the License.
*
*/
-public interface QueueEntry extends Comparable<QueueEntry>
+public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
-
public static enum State
{
AVAILABLE,
ACQUIRED,
EXPIRED,
DEQUEUED,
- DELETED
+ DELETED;
+
+
}
public static interface StateChangeListener
@@ -121,6 +122,27 @@
}
}
+ public final class SubscriptionAssignedState extends EntryState
+ {
+ private final Subscription _subscription;
+
+ public SubscriptionAssignedState(Subscription subscription)
+ {
+ _subscription = subscription;
+ }
+
+
+ public State getState()
+ {
+ return State.AVAILABLE;
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+ }
+
final static EntryState AVAILABLE_STATE = new AvailableState();
final static EntryState DELETED_STATE = new DeletedState();
@@ -133,7 +155,7 @@
AMQQueue getQueue();
- AMQMessage getMessage();
+ ServerMessage getMessage();
long getSize();
@@ -150,16 +172,17 @@
boolean isDeleted();
boolean acquiredBySubscription();
-
- void setDeliveredToSubscription();
+ boolean isAcquiredBy(Subscription subscription);
void release();
+ boolean releaseButRetain();
- String debugIdentity();
boolean immediateAndNotDelivered();
- void setRedelivered(boolean b);
+ void setRedelivered();
+
+ boolean isRedelivered();
Subscription getDeliveredSubscription();
@@ -169,13 +192,15 @@
boolean isRejectedBy(Subscription subscription);
- void requeue(StoreContext storeContext) throws AMQException;
+ void requeue(Subscription subscription);
+
+ void dequeue();
- void dequeue(final StoreContext storeContext) throws FailedDequeueException;
+ void dispose();
- void dispose(final StoreContext storeContext) throws MessageCleanupException;
+ void discard();
- void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException;
+ void routeToAlternate();
boolean isQueueDeleted();
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sun Oct 25 22:58:57 2009
@@ -21,12 +21,16 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.log4j.Logger;
-import java.util.Set;
-import java.util.HashSet;
+import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -42,7 +46,7 @@
private final SimpleQueueEntryList _queueEntryList;
- private final AMQMessage _message;
+ private MessageReference _message;
private Set<Subscription> _rejectedBy = null;
@@ -75,6 +79,11 @@
volatile QueueEntryImpl _next;
+ private static final int DELIVERED_TO_CONSUMER = 1;
+ private static final int REDELIVERED = 2;
+
+ private volatile int _deliveryState;
+
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
@@ -83,18 +92,19 @@
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message, final long entryId)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message, final long entryId)
{
_queueEntryList = queueEntryList;
- _message = message;
+
+ _message = message == null ? null : message.newReference();
_entryIdUpdater.set(this, entryId);
}
- public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
+ public QueueEntryImpl(SimpleQueueEntryList queueEntryList, ServerMessage message)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _message = message == null ? null : message.newReference();
}
protected void setEntryId(long entryId)
@@ -112,24 +122,36 @@
return _queueEntryList.getQueue();
}
- public AMQMessage getMessage()
+ public ServerMessage getMessage()
{
- return _message;
+ return _message == null ? null : _message.getMessage();
}
public long getSize()
{
- return getMessage().getSize();
+ return getMessage() == null ? 0 : getMessage().getSize();
}
public boolean getDeliveredToConsumer()
{
- return getMessage().getDeliveredToConsumer();
+ return (_deliveryState & DELIVERED_TO_CONSUMER) != 0;
}
public boolean expired() throws AMQException
{
- return getMessage().expired(getQueue());
+ ServerMessage message = getMessage();
+ if(message != null)
+ {
+ long expiration = message.getExpiration();
+ if (expiration != 0L)
+ {
+ long now = System.currentTimeMillis();
+
+ return (now > expiration);
+ }
+ }
+ return false;
+
}
public boolean isAcquired()
@@ -145,6 +167,24 @@
private boolean acquire(final EntryState state)
{
boolean acquired = _stateUpdater.compareAndSet(this,AVAILABLE_STATE, state);
+
+ // deal with the case where the node has been assigned to a given subscription already
+ // including the case that the node is assigned to a closed subscription
+ if(!acquired)
+ {
+ if(state != NON_SUBSCRIPTION_ACQUIRED_STATE)
+ {
+ EntryState currentState = _state;
+ if(currentState.getState() == State.AVAILABLE
+ && ((currentState == AVAILABLE_STATE)
+ || (((SubscriptionAcquiredState)state).getSubscription() ==
+ ((SubscriptionAssignedState)currentState).getSubscription())
+ || ((SubscriptionAssignedState)currentState).getSubscription().isClosed() ))
+ {
+ acquired = _stateUpdater.compareAndSet(this,currentState, state);
+ }
+ }
+ }
if(acquired && _stateChangeListeners != null)
{
notifyStateChange(State.AVAILABLE, State.ACQUIRED);
@@ -155,7 +195,12 @@
public boolean acquire(Subscription sub)
{
- return acquire(sub.getOwningState());
+ final boolean acquired = acquire(sub.getOwningState());
+ if(acquired)
+ {
+ _deliveryState |= DELIVERED_TO_CONSUMER;
+ }
+ return acquired;
}
public boolean acquiredBySubscription()
@@ -164,38 +209,89 @@
return (_state instanceof SubscriptionAcquiredState);
}
- public void setDeliveredToSubscription()
+ public boolean isAcquiredBy(Subscription subscription)
{
- getMessage().setDeliveredToConsumer();
+ EntryState state = _state;
+ return state instanceof SubscriptionAcquiredState
+ && ((SubscriptionAcquiredState)state).getSubscription() == subscription;
}
public void release()
{
_stateUpdater.set(this,AVAILABLE_STATE);
- }
+ if(!getQueue().isDeleted())
+ {
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
- public String debugIdentity()
- {
- AMQMessage message = getMessage();
- if (message == null)
+ }
+ else if(acquire())
{
- return "null";
+ routeToAlternate();
}
- else
+
+
+ }
+
+ public boolean releaseButRetain()
+ {
+ EntryState state = _state;
+
+ boolean stateUpdated = false;
+
+ if(state instanceof SubscriptionAcquiredState)
{
- return message.debugIdentity();
+ Subscription sub = ((SubscriptionAcquiredState) state).getSubscription();
+ if(_stateUpdater.compareAndSet(this, state, sub.getAssignedState()))
+ {
+ System.err.println("Message released (and retained)" + getMessage().getMessageNumber());
+ getQueue().requeue(this);
+ if(_stateChangeListeners != null)
+ {
+ notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
+ }
+ stateUpdated = true;
+ }
}
+
+ return stateUpdated;
+
}
+ public boolean immediateAndNotDelivered()
+ {
+ return !getDeliveredToConsumer() && isImmediate();
+ }
+
+ private boolean isImmediate()
+ {
+ final ServerMessage message = getMessage();
+ return message != null && message.isImmediate();
+ }
- public boolean immediateAndNotDelivered()
+ public void setRedelivered()
{
- return getMessage().immediateAndNotDelivered();
+ _deliveryState |= REDELIVERED;
}
- public void setRedelivered(boolean b)
+ public AMQMessageHeader getMessageHeader()
{
- getMessage().setRedelivered(b);
+ final ServerMessage message = getMessage();
+ return message == null ? null : message.getMessageHeader();
+ }
+
+ public boolean isPersistent()
+ {
+ final ServerMessage message = getMessage();
+ return message != null && message.isPersistent();
+ }
+
+ public boolean isRedelivered()
+ {
+ return (_deliveryState & REDELIVERED) != 0;
}
public Subscription getDeliveredSubscription()
@@ -230,12 +326,12 @@
}
else
{
- _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
+ _log.warn("Requesting rejection by null subscriber:" + this);
}
}
public boolean isRejectedBy(Subscription subscription)
- {
+ {
if (_rejectedBy != null) // We have subscriptions that rejected this message
{
@@ -247,17 +343,16 @@
}
}
-
- public void requeue(final StoreContext storeContext) throws AMQException
+ public void requeue(Subscription subscription)
{
- getQueue().requeue(storeContext, this);
+ getQueue().requeue(this, subscription);
if(_stateChangeListeners != null)
{
notifyStateChange(QueueEntry.State.ACQUIRED, QueueEntry.State.AVAILABLE);
}
}
- public void dequeue(final StoreContext storeContext) throws FailedDequeueException
+ public void dequeue()
{
EntryState state = _state;
@@ -266,10 +361,10 @@
if (state instanceof SubscriptionAcquiredState)
{
Subscription s = ((SubscriptionAcquiredState) state).getSubscription();
- s.restoreCredit(this);
+ s.onDequeue(this);
}
- getQueue().dequeue(storeContext, this);
+ getQueue().dequeue(this);
if(_stateChangeListeners != null)
{
notifyStateChange(state.getState() , QueueEntry.State.DEQUEUED);
@@ -287,23 +382,74 @@
}
}
- public void dispose(final StoreContext storeContext) throws MessageCleanupException
+ public void dispose()
{
if(delete())
{
- getMessage().decrementReference(storeContext);
+ _message.release();
}
}
- public void discard(StoreContext storeContext) throws FailedDequeueException, MessageCleanupException
+ public void discard()
{
//if the queue is null then the message is waiting to be acked, but has been removed.
if (getQueue() != null)
{
- dequeue(storeContext);
+ dequeue();
}
- dispose(storeContext);
+ dispose();
+ }
+
+ public void routeToAlternate()
+ {
+ final AMQQueue currentQueue = getQueue();
+ Exchange alternateExchange = currentQueue.getAlternateExchange();
+
+ if(alternateExchange != null)
+ {
+ final List<AMQQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ final ServerMessage message = getMessage();
+ if(rerouteQueues != null && rerouteQueues.size() != 0)
+ {
+ ServerTransaction txn = new AutoCommitTransaction(getQueue().getVirtualHost().getTransactionLog());
+
+ txn.enqueue(rerouteQueues, message, new ServerTransaction.Action() {
+ public void postCommit()
+ {
+ try
+ {
+ for(AMQQueue queue : rerouteQueues)
+ {
+ QueueEntry entry = queue.enqueue(message);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ txn.dequeue(currentQueue,message,
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ discard();
+ }
+
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+ }
}
public boolean isQueueDeleted()
@@ -379,7 +525,7 @@
if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
{
- _queueEntryList.advanceHead();
+ _queueEntryList.advanceHead();
return true;
}
else
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sun Oct 25 22:58:57 2009
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.message.ServerMessage;
+
public interface QueueEntryList
{
AMQQueue getQueue();
- QueueEntry add(AMQMessage message);
+ QueueEntry add(ServerMessage message);
QueueEntry next(QueueEntry node);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,9 +30,9 @@
{
VirtualHost getVirtualHost();
- void registerQueue(AMQQueue queue) throws AMQException;
+ void registerQueue(AMQQueue queue);
- void unregisterQueue(AMQShortString name) throws AMQException;
+ void unregisterQueue(AMQShortString name);
AMQQueue getQueue(AMQShortString name);
@@ -40,4 +40,5 @@
Collection<AMQQueue> getQueues();
+ AMQQueue getQueue(String queue);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org