You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/17 22:33:04 UTC
svn commit: r497179 - in
/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client:
AMQSession.java BasicMessageConsumer.java BasicMessageProducer.java
handler/MessageTransferMethodHandler.java
Author: kpvdr
Date: Wed Jan 17 13:33:03 2007
New Revision: 497179
URL: http://svn.apache.org/viewvc?view=rev&rev=497179
Log:
Solved remaining compile problems in client except for missing line in UnprocessedMessage.java file.
Modified:
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan 17 13:33:03 2007
@@ -797,7 +797,7 @@
_inRecovery = inRecovery;
}
- public void acknowledge() throws JMSException
+ public void acknowledge() throws JMSException, AMQException
{
if (isClosed())
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jan 17 13:33:03 2007
@@ -239,14 +239,21 @@
if (messageListener != null)
{
- //handle case where connection has already been started, and the dispatcher is blocked
- //doing a put on the _synchronousQueue
- AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
- if (jmsMsg != null)
- {
- preApplicationProcessing(jmsMsg);
- messageListener.onMessage(jmsMsg);
- postDeliver(jmsMsg);
+ try
+ {
+ //handle case where connection has already been started, and the dispatcher is blocked
+ //doing a put on the _synchronousQueue
+ AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
+ if (jmsMsg != null)
+ {
+ preApplicationProcessing(jmsMsg);
+ messageListener.onMessage(jmsMsg);
+ postDeliver(jmsMsg);
+ }
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
}
}
}
@@ -361,6 +368,10 @@
_logger.warn("Interrupted: " + e);
return null;
}
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
finally
{
releaseReceiving();
@@ -402,6 +413,10 @@
return m;
}
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
finally
{
releaseReceiving();
@@ -501,12 +516,12 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("notifyMessage called with message number " + messageFrame.content.getDestination());
+ _logger.debug("notifyMessage called with message number " + messageFrame.deliveryTag);
}
try
{
- AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.contentHeader.getDestination(),
- messageFrame.contentHeader.getr,
+ AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliveryTag,
+ false,
messageFrame.contentHeader,
messageFrame.content);
@@ -541,7 +556,7 @@
}
}
- private void preDeliver(AbstractJMSMessage msg)
+ private void preDeliver(AbstractJMSMessage msg) throws AMQException
{
switch (_acknowledgeMode)
{
@@ -556,7 +571,7 @@
}
}
- private void postDeliver(AbstractJMSMessage msg) throws JMSException
+ private void postDeliver(AbstractJMSMessage msg) throws JMSException, AMQException
{
msg.setJMSDestination(_destination);
switch (_acknowledgeMode)
@@ -608,7 +623,7 @@
/**
* Acknowledge up to last message delivered (if any). Used when commiting.
*/
- void acknowledgeLastDelivered()
+ void acknowledgeLastDelivered() throws AMQException
{
if (_lastDeliveryTag > 0)
{
@@ -666,7 +681,7 @@
}
}
- public void acknowledge() throws JMSException
+ public void acknowledge() throws JMSException, AMQException
{
if(!isClosed())
{
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Jan 17 13:33:03 2007
@@ -106,6 +106,7 @@
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
+ throws AMQException
{
_connection = connection;
_destination = destination;
@@ -131,7 +132,7 @@
}
}
- private void declareDestination(AMQDestination destination)
+ private void declareDestination(AMQDestination destination) throws AMQException
{
// Declare the exchange
// Note that the durable and internal arguments are ignored since passive is set to false
@@ -344,7 +345,7 @@
public void send(Destination destination, Message message, int deliveryMode,
int priority, long timeToLive, boolean mandatory,
boolean immediate, boolean waitUntilSent)
- throws JMSException
+ throws JMSException, AMQException
{
checkPreConditions();
checkDestination(destination);
@@ -494,7 +495,14 @@
throw new JMSException("Unsupported destination class: " +
(destination != null ? destination.getClass() : null));
}
- declareDestination((AMQDestination) destination);
+ try
+ {
+ declareDestination((AMQDestination) destination);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
}
protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
@@ -560,32 +568,39 @@
}
for (int i = 0; i < content.length; i++)
{
- AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- content[i], // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)deliveryMode, // short deliveryMode
- messageHeaders.getDestination(), // String destination
- destination.getExchangeName(), // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- messageHeaders.getMessageId(), // String messageId
- (short)priority, // short priority
- false, // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- destination.getRoutingKey(), // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- timeToLive, // long ttl
- messageHeaders.getUserId()); // String userId
+ try
+ {
+ AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+ (byte)0, (byte)9, // AMQP version (major, minor)
+ messageHeaders.getAppId(), // String appId
+ messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+ content[i], // Content body
+ messageHeaders.getEncoding(), // String contentEncoding
+ messageHeaders.getContentType(), // String contentType
+ messageHeaders.getCorrelationId(), // String correlationId
+ (short)deliveryMode, // short deliveryMode
+ messageHeaders.getDestination(), // String destination
+ destination.getExchangeName(), // String exchange
+ messageHeaders.getExpiration(), // long expiration
+ immediate, // boolean immediate
+ messageHeaders.getMessageId(), // String messageId
+ (short)priority, // short priority
+ message.getJMSRedelivered(), // boolean redelivered
+ messageHeaders.getReplyTo(), // String replyTo
+ destination.getRoutingKey(), // String routingKey
+ new String("abc123").getBytes(), // byte[] securityToken
+ 0, // int ticket
+ messageHeaders.getTimestamp(), // long timestamp
+ messageHeaders.getTransactionId(), // String transactionId
+ timeToLive, // long ttl
+ messageHeaders.getUserId()); // String userId
- _protocolHandler.writeRequest(_channelId, methodBody);
+ _protocolHandler.writeRequest(_channelId, methodBody);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
}
Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Jan 17 13:33:03 2007
@@ -52,6 +52,7 @@
MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
msg.content = transferBody.getBody();
msg.channelId = evt.getChannelId();
+ msg.deliveryTag = evt.getRequestId();
_logger.debug("New JmsDeliver method received");
MessageHeaders messageHeaders = new MessageHeaders();