You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 17:03:29 UTC
svn commit: r577326 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Author: arnaudsimon
Date: Wed Sep 19 08:03:28 2007
New Revision: 577326
URL: http://svn.apache.org/viewvc?rev=577326&view=rev
Log:
updated for using jms message for filtering incoming messages
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=577326&r1=577325&r2=577326&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Sep 19 08:03:28 2007
@@ -96,12 +96,17 @@
}
// ----- Interface org.apache.qpidity.client.util.MessageListener
- public void onMessage(Message message)
+
+ /**
+ * @param jmsMessage this message has already been processed so can't redo preDeliver
+ * @param channelId
+ */
+ public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
{
boolean messageOk = false;
try
{
- messageOk = checkPreConditions(message);
+ messageOk = checkPreConditions(jmsMessage);
}
catch (AMQException e)
{
@@ -112,44 +117,51 @@
}
catch (JMSException e1)
{
- // we should silently log thie exception as it only hanppens when the connection is closed
- _logger.error("Exception when receiving message", e1);
+ // we should silently log thie exception as it only hanppens when the connection is closed
+ _logger.error("Exception when receiving message", e1);
}
}
if (messageOk)
{
- int channelId = getSession().getChannelId();
- long deliveryId = message.getMessageTransferId();
- String consumerTag = getConsumerTag().toString();
- AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
- AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
- boolean redelivered = message.getDeliveryProperties().getRedelivered();
- UnprocessedMessage_0_10 newMessage =
- new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- try
- {
- newMessage.receiveBody(message.readData());
- }
- catch (IOException e)
- {
- getSession().getAMQConnection().exceptionReceived(e);
- }
- Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the exchange info
- if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
- {
- Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
- .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
- ExchangeQueryResult res = future.get();
- // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
- .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
- .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
- newMessage.setReplyToURL(replyToUrl);
- }
- newMessage.setContentHeader(headers);
- getSession().messageReceived(newMessage);
+ super.notifyMessage(jmsMessage, channelId);
+ }
+ }
+
+
+ public void onMessage(Message message)
+ {
+ boolean messageOk = false;
+ int channelId = getSession().getChannelId();
+ long deliveryId = message.getMessageTransferId();
+ String consumerTag = getConsumerTag().toString();
+ AMQShortString exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
+ AMQShortString routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
+ boolean redelivered = message.getDeliveryProperties().getRedelivered();
+ UnprocessedMessage_0_10 newMessage =
+ new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
+ try
+ {
+ newMessage.receiveBody(message.readData());
+ }
+ catch (IOException e)
+ {
+ getSession().getAMQConnection().exceptionReceived(e);
+ }
+ Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
+ // if there is a replyto destination then we need to request the exchange info
+ if (!message.getMessageProperties().getReplyTo().getExchangeName().equals(""))
+ {
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10) getSession()).getQpidSession()
+ .exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
+ ExchangeQueryResult res = future.get();
+ // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + message.getMessageProperties().getReplyTo()
+ .getExchangeName() + "/" + message.getMessageProperties().getReplyTo()
+ .getRoutingKey() + "/" + message.getMessageProperties().getReplyTo().getRoutingKey();
+ newMessage.setReplyToURL(replyToUrl);
}
+ newMessage.setContentHeader(headers);
+ getSession().messageReceived(newMessage);
// else ignore this message
}
@@ -201,7 +213,7 @@
* @return true if the message matches the selector and can be acquired, false otherwise.
* @throws AMQException If the message preConditions cannot be checked due to some internal error.
*/
- private boolean checkPreConditions(Message message) throws AMQException
+ private boolean checkPreConditions(AbstractJMSMessage message) throws AMQException
{
boolean messageOk = true;
// TODO Use a tag for fiding out if message filtering is done here or by the broker.
@@ -258,12 +270,12 @@
* @param message The message to be acknowledged
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private void acknowledgeMessage(Message message) throws AMQException
+ private void acknowledgeMessage(AbstractJMSMessage message) throws AMQException
{
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
+ ranges.add(message.getDeliveryTag());
_0_10session.getQpidSession().messageAcknowledge(ranges);
_0_10session.getCurrentException();
}
@@ -275,12 +287,12 @@
* @param message The message to be released
* @throws AMQException If the message cannot be released due to some internal error.
*/
- private void releaseMessage(Message message) throws AMQException
+ private void releaseMessage(AbstractJMSMessage message) throws AMQException
{
if (_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
+ ranges.add(message.getDeliveryTag());
_0_10session.getQpidSession().messageRelease(ranges);
_0_10session.getCurrentException();
}
@@ -293,13 +305,13 @@
* @return true if the message has been acquired, false otherwise.
* @throws AMQException If the message cannot be acquired due to some internal error.
*/
- private boolean acquireMessage(Message message) throws AMQException
+ private boolean acquireMessage(AbstractJMSMessage message) throws AMQException
{
boolean result = false;
if (!_preAcquire)
{
RangeSet ranges = new RangeSet();
- ranges.add(message.getMessageTransferId());
+ ranges.add(message.getDeliveryTag());
_0_10session.getQpidSession()
.messageAcquire(ranges, org.apache.qpidity.nclient.Session.MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE);