You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/01/17 22:33:04 UTC

svn commit: r497179 - in /incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client: AMQSession.java BasicMessageConsumer.java BasicMessageProducer.java handler/MessageTransferMethodHandler.java

Author: kpvdr
Date: Wed Jan 17 13:33:03 2007
New Revision: 497179

URL: http://svn.apache.org/viewvc?view=rev&rev=497179
Log:
Solved remaining compile problems in client except for missing line in UnprocessedMessage.java file.

Modified:
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan 17 13:33:03 2007
@@ -797,7 +797,7 @@
         _inRecovery = inRecovery;
     }
 
-    public void acknowledge() throws JMSException
+    public void acknowledge() throws JMSException, AMQException
     {
         if (isClosed())
         {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Jan 17 13:33:03 2007
@@ -239,14 +239,21 @@
 
             if (messageListener != null)
             {
-                //handle case where connection has already been started, and the dispatcher is blocked
-                //doing a put on the _synchronousQueue
-                AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
-                if (jmsMsg != null)
-                {
-                    preApplicationProcessing(jmsMsg);
-                    messageListener.onMessage(jmsMsg);
-                    postDeliver(jmsMsg);
+                try
+                {
+                    //handle case where connection has already been started, and the dispatcher is blocked
+                    //doing a put on the _synchronousQueue
+                    AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
+                    if (jmsMsg != null)
+                    {
+                        preApplicationProcessing(jmsMsg);
+                        messageListener.onMessage(jmsMsg);
+                        postDeliver(jmsMsg);
+                    }
+                }
+                catch (AMQException e)
+                {
+                    throw new JMSException(e.toString());
                 }
             }
         }
@@ -361,6 +368,10 @@
             _logger.warn("Interrupted: " + e);
             return null;
         }
+        catch (AMQException e)
+        {
+            throw new JMSException(e.toString());
+        }
         finally
         {
             releaseReceiving();
@@ -402,6 +413,10 @@
 
             return m;
         }
+        catch (AMQException e)
+        {
+            throw new JMSException(e.toString());
+        }
         finally
         {
             releaseReceiving();
@@ -501,12 +516,12 @@
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("notifyMessage called with message number " + messageFrame.content.getDestination());
+            _logger.debug("notifyMessage called with message number " + messageFrame.deliveryTag);
         }
         try
         {
-            AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.contentHeader.getDestination(),
-                                                                          messageFrame.contentHeader.getr,
+            AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.deliveryTag,
+                                                                          false,
                                                                           messageFrame.contentHeader,
                                                                           messageFrame.content);
 
@@ -541,7 +556,7 @@
         }
     }
 
-    private void preDeliver(AbstractJMSMessage msg)
+    private void preDeliver(AbstractJMSMessage msg) throws AMQException
     {
         switch (_acknowledgeMode)
         {
@@ -556,7 +571,7 @@
         }
     }
 
-    private void postDeliver(AbstractJMSMessage msg) throws JMSException
+    private void postDeliver(AbstractJMSMessage msg) throws JMSException, AMQException
     {
     	msg.setJMSDestination(_destination);
         switch (_acknowledgeMode)
@@ -608,7 +623,7 @@
     /**
      * Acknowledge up to last message delivered (if any). Used when commiting.
      */
-    void acknowledgeLastDelivered()
+    void acknowledgeLastDelivered() throws AMQException
     {
         if (_lastDeliveryTag > 0)
         {
@@ -666,7 +681,7 @@
 		}
 	}
 
-    public void acknowledge() throws JMSException
+    public void acknowledge() throws JMSException, AMQException
     {
         if(!isClosed())
         {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Jan 17 13:33:03 2007
@@ -106,6 +106,7 @@
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
                                    int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
                                    long producerId, boolean immediate, boolean mandatory, boolean waitUntilSent)
+                                   throws AMQException
     {
         _connection = connection;
         _destination = destination;
@@ -131,7 +132,7 @@
         }
     }
 
-    private void declareDestination(AMQDestination destination)
+    private void declareDestination(AMQDestination destination) throws AMQException
     {
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since passive is set to false
@@ -344,7 +345,7 @@
     public void send(Destination destination, Message message, int deliveryMode,
                      int priority, long timeToLive, boolean mandatory,
                      boolean immediate, boolean waitUntilSent)
-            throws JMSException
+            throws JMSException, AMQException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -494,7 +495,14 @@
             throw new JMSException("Unsupported destination class: " +
                     (destination != null ? destination.getClass() : null));
         }
-        declareDestination((AMQDestination) destination);
+        try
+        {
+            declareDestination((AMQDestination) destination);
+        }
+        catch (AMQException e)
+        {
+            throw new JMSException(e.toString());
+        }
     }
 
     protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
@@ -560,32 +568,39 @@
         }
         for (int i = 0; i < content.length; i++)
         {
-            AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
-                (byte)0, (byte)9,               // AMQP version (major, minor)
-                messageHeaders.getAppId(),      // String appId
-                messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
-                content[i],                     // Content body
-                messageHeaders.getEncoding(),   // String contentEncoding
-                messageHeaders.getContentType(), // String contentType
-                messageHeaders.getCorrelationId(), // String correlationId
-                (short)deliveryMode,            // short deliveryMode
-                messageHeaders.getDestination(), // String destination
-                destination.getExchangeName(),  // String exchange
-                messageHeaders.getExpiration(), // long expiration
-                immediate,                      // boolean immediate
-                messageHeaders.getMessageId(),  // String messageId
-                (short)priority,                // short priority
-                false,                          // boolean redelivered
-                messageHeaders.getReplyTo(),    // String replyTo
-                destination.getRoutingKey(),    // String routingKey
-                new String("abc123").getBytes(), // byte[] securityToken
-                0,                              // int ticket
-                messageHeaders.getTimestamp(),  // long timestamp
-                messageHeaders.getTransactionId(), // String transactionId
-                timeToLive,                     // long ttl
-                messageHeaders.getUserId());    // String userId
+            try
+            {
+                AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+                    (byte)0, (byte)9,               // AMQP version (major, minor)
+                    messageHeaders.getAppId(),      // String appId
+                    messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+                    content[i],                     // Content body
+                    messageHeaders.getEncoding(),   // String contentEncoding
+                    messageHeaders.getContentType(), // String contentType
+                    messageHeaders.getCorrelationId(), // String correlationId
+                    (short)deliveryMode,            // short deliveryMode
+                    messageHeaders.getDestination(), // String destination
+                    destination.getExchangeName(),  // String exchange
+                    messageHeaders.getExpiration(), // long expiration
+                    immediate,                      // boolean immediate
+                    messageHeaders.getMessageId(),  // String messageId
+                    (short)priority,                // short priority
+                    message.getJMSRedelivered(),    // boolean redelivered
+                    messageHeaders.getReplyTo(),    // String replyTo
+                    destination.getRoutingKey(),    // String routingKey
+                    new String("abc123").getBytes(), // byte[] securityToken
+                    0,                              // int ticket
+                    messageHeaders.getTimestamp(),  // long timestamp
+                    messageHeaders.getTransactionId(), // String transactionId
+                    timeToLive,                     // long ttl
+                    messageHeaders.getUserId());    // String userId
         
-            _protocolHandler.writeRequest(_channelId, methodBody);
+                _protocolHandler.writeRequest(_channelId, methodBody);
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException(e.toString());
+            }
         }
 
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=497179&r1=497178&r2=497179
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Jan 17 13:33:03 2007
@@ -52,6 +52,7 @@
     	MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
         msg.content = transferBody.getBody();
         msg.channelId = evt.getChannelId();
+        msg.deliveryTag = evt.getRequestId();
         _logger.debug("New JmsDeliver method received");
         
         MessageHeaders messageHeaders = new MessageHeaders();