You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/10 09:31:43 UTC
svn commit: r494765 - in
/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server:
filter/PropertyExpression.java queue/AMQMessage.java
Author: rgreig
Date: Wed Jan 10 00:31:42 2007
New Revision: 494765
URL: http://svn.apache.org/viewvc?view=rev&rev=494765
Log:
QPID-273 : (Patch supplied by Rob Godfrey) Remove unnecessary Map creation
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?view=diff&rev=494765&r1=494764&r2=494765
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Wed Jan 10 00:31:42 2007
@@ -26,6 +26,7 @@
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
+import javax.jms.Message;
//import org.apache.activemq.command.ActiveMQDestination;
//import org.apache.activemq.command.Message;
@@ -45,225 +46,232 @@
public class PropertyExpression implements Expression
{
- interface SubExpression
- {
- public Object evaluate(AMQMessage message) throws AMQException;
- }
-
- interface JMSExpression
- {
- public abstract Object evaluate(JMSMessage message);
- }
-
- static class SubJMSExpression implements SubExpression
- {
- JMSExpression _expression;
-
- SubJMSExpression(JMSExpression expression)
- {
- _expression = expression;
- }
-
- public Object evaluate(AMQMessage message) throws AMQException
- {
- JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
- if (msg != null)
- {
- return _expression.evaluate(msg);
- }
- else
- {
- return null;
- }
- }
- }
private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
- static final private HashMap JMS_PROPERTY_EXPRESSIONS = new HashMap();
+ static final private HashMap<String, Expression> JMS_PROPERTY_EXPRESSIONS = new HashMap<String, Expression>();
static
{
- JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSDestination();
+ //TODO
+ return null;
}
}
- ));
-//
-// public Object evaluate(AMQMessage message)
-// {
-// //fixme
-//
-//
-//// AMQDestination dest = message.getOriginalDestination();
-//// if (dest == null)
-//// {
-//// dest = message.getDestination();
-//// }
-//// if (dest == null)
-//// {
-//// return null;
-//// }
-//// return dest.toString();
-// return "";
-// }
-// });
- JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression(
- new JMSExpression()
- {
- public Object evaluate(JMSMessage message)
+ );
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new Expression()
+ {
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSReplyTo();
+ try
+ {
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ return _properties.getReplyTo();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn(e);
+ return null;
+ }
+
}
- })
- );
- JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression(
- new JMSExpression()
+ });
+
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSType();
+ try
+ {
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ return _properties.getType();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn(e);
+ return null;
+ }
+
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
try
{
- Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- _logger.info("JMSDeliveryMode is :" + mode);
+ int mode = message.isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("JMSDeliveryMode is :" + mode);
+ }
return mode;
}
catch (AMQException e)
{
- //shouldn't happen
+ _logger.warn(e);
}
return DeliveryMode.NON_PERSISTENT;
}
- }));
+ });
- JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSPriority();
+ try
+ {
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ return (int) _properties.getPriority();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn(e);
+ }
+ return Message.DEFAULT_PRIORITY;
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getAMQMessage().getMessageId();
+
+ try
+ {
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ return _properties.getMessageId();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn(e);
+ return null;
+ }
+
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSTimestamp();
+
+ try
+ {
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ return _properties.getTimestamp();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn(e);
+ return null;
+ }
+
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSCorrelationID();
+
+ try
+ {
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ return _properties.getCorrelationId();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn(e);
+ return null;
+ }
+
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSExpiration();
+
+ try
+ {
+ BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
+ return _properties.getExpiration();
+ }
+ catch (AMQException e)
+ {
+ _logger.warn(e);
+ return null;
+ }
+
}
- }
- ));
+ });
- JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getAMQMessage().isRedelivered();
+ return message.isRedelivered();
}
}
- ));
+ );
}
private final String name;
- private final SubExpression jmsPropertyExpression;
+ private final Expression jmsPropertyExpression;
public PropertyExpression(String name)
{
this.name = name;
- jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
+ jmsPropertyExpression = JMS_PROPERTY_EXPRESSIONS.get(name);
}
public Object evaluate(AMQMessage message) throws AMQException
{
-// try
-// {
-// if (message.isDropped())
-// {
-// return null;
-// }
if (jmsPropertyExpression != null)
{
return jmsPropertyExpression.evaluate(message);
}
-// try
+
else
{
BasicContentHeaderProperties _properties = (BasicContentHeaderProperties) message.getContentHeaderBody().properties;
- _logger.info("Looking up property:" + name);
- _logger.info("Properties are:" + _properties.getHeaders().keySet());
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Looking up property:" + name);
+ _logger.debug("Properties are:" + _properties.getHeaders().keySet());
+ }
return _properties.getHeaders().getObject(name);
}
-// catch (IOException ioe)
-// {
-// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage());
-// exception.initCause(ioe);
-// throw exception;
-// }
-// }
-// catch (IOException e)
-// {
-// JMSException exception = new JMSException(e.getMessage());
-// exception.initCause(e);
-// throw exception;
-// }
-
}
public String getName()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=494765&r1=494764&r2=494765
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Jan 10 00:31:42 2007
@@ -48,7 +48,7 @@
/**
* Used in clustering
*/
- private final Set<Object> _tokens = new HashSet<Object>();
+ private Set<Object> _tokens;
/**
* Only use in clustering - should ideally be removed?
@@ -71,7 +71,6 @@
*/
private boolean _deliveredToConsumer;
- private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new TransientMessageData();
@@ -167,7 +166,7 @@
_messageId = messageId;
_txnContext = txnContext;
_transientMessageData.setPublishBody(publishBody);
- _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
+
_taken = new AtomicBoolean(false);
if (_log.isDebugEnabled())
{
@@ -396,39 +395,6 @@
}
- public MessageDecorator getDecodedMessage(String type) throws AMQException
- {
- MessageDecorator msgtype = null;
-
- if (_decodedMessages != null)
- {
- msgtype = _decodedMessages.get(type);
-
- if (msgtype == null)
- {
- msgtype = decorateMessage(type);
- }
- }
-
- return msgtype;
- }
-
- private MessageDecorator decorateMessage(String type) throws AMQException
- {
- MessageDecorator msgdec = null;
-
- if (type.equals(JMS_MESSAGE))
- {
- msgdec = new JMSMessage(this);
- }
-
- if (msgdec != null)
- {
- _decodedMessages.put(type, msgdec);
- }
-
- return msgdec;
- }
public boolean taken()
{
@@ -442,6 +408,12 @@
public boolean checkToken(Object token)
{
+
+ if(_tokens==null)
+ {
+ _tokens = new HashSet<Object>();
+ }
+
if (_tokens.contains(token))
{
return true;
@@ -569,32 +541,40 @@
AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
getContentHeaderBody());
- Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(channelId);
- //
- // Optimise the case where we have a single content body. In that case we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
+ final int bodyCount = _messageHandle.getBodyCount(_messageId);
+ if(bodyCount == 0)
{
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
+ contentHeader);
protocolSession.writeFrame(compositeBlock);
}
else
{
- CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver,
- new AMQDataBlock[]{contentHeader});
+
+
+ //
+ // Optimise the case where we have a single content body. In that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
+ //
+ ContentBody cb = _messageHandle.getContentBody(_messageId, 0);
+
+ AMQDataBlock firstContentBody = ContentBody.createAMQFrame(channelId, cb);
+ AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
protocolSession.writeFrame(compositeBlock);
- }
- //
- // Now start writing out the other content bodies
- //
- while (bodyFrameIterator.hasNext())
- {
- protocolSession.writeFrame(bodyFrameIterator.next());
+ //
+ // Now start writing out the other content bodies
+ //
+ for(int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentBody(_messageId, i);
+ protocolSession.writeFrame(ContentBody.createAMQFrame(channelId, cb));
+ }
+
+
}
+
}