You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/09/05 21:54:25 UTC

svn commit: r573039 - /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/

Author: rajith
Date: Wed Sep  5 12:54:24 2007
New Revision: 573039

URL: http://svn.apache.org/viewvc?rev=573039&view=rev
Log:
Separated 0-8 functionality from the AMQSession,/BasicMessageProducer and BasicMessageConsumer and made them abstract

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Sep  5 12:54:24 2007
@@ -535,7 +535,7 @@
                         // open it, so that there is no window where we could receive data on the channel and not be set
                         // up to handle it appropriately.
                         AMQSession session =
-                                new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+                                new AMQSession_0_8(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
                                                prefetchLow);
                         // _protocolHandler.addSessionByChannel(channelId, session);
                         registerSession(channelId, session);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Sep  5 12:54:24 2007
@@ -21,6 +21,41 @@
 package org.apache.qpid.client;
 
 
+import java.io.Serializable;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
@@ -129,7 +164,7 @@
  * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
  *       after looking at worse bottlenecks first.
  */
-public class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
 {
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
@@ -174,16 +209,16 @@
     public static final String IMMEDIATE_PREFETCH_DEFAULT = "false";
 
     /** The connection to which this session belongs. */
-    private AMQConnection _connection;
+    protected AMQConnection _connection;
 
     /** Used to indicate whether or not this is a transactional session. */
-    private boolean _transacted;
+    protected boolean _transacted;
 
     /** Holds the sessions acknowledgement mode. */
-    private int _acknowledgeMode;
+    protected int _acknowledgeMode;
 
     /** Holds this session unique identifier, used to distinguish it from other sessions. */
-    private int _channelId;
+    protected int _channelId;
 
     /** @todo This does not appear to be set? */
     private int _ticket;
@@ -231,7 +266,7 @@
     private Dispatcher _dispatcher;
 
     /** Holds the message factory factory for this session. */
-    private MessageFactoryRegistry _messageFactoryRegistry;
+    protected MessageFactoryRegistry _messageFactoryRegistry;
 
     /** Holds all of the producers created by this session, keyed by their unique identifiers. */
     private Map<Long, MessageProducer> _producers = new ConcurrentHashMap<Long, MessageProducer>();
@@ -246,7 +281,7 @@
      * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
      * consumer.
      */
-    private Map<AMQShortString, BasicMessageConsumer> _consumers =
+    protected Map<AMQShortString, BasicMessageConsumer> _consumers =
             new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
 
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
@@ -428,19 +463,7 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple)
-    {
-        final AMQFrame ackFrame =
-                BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                                            multiple);
-
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
-        }
-
-        getProtocolHandler().writeFrame(ackFrame);
-    }
+    public abstract void acknowledgeMessage(long deliveryTag, boolean multiple);
 
     /**
      * Binds the named queue, with the specified routing key, to the named exchange.
@@ -466,22 +489,15 @@
         {
             public Object execute() throws AMQException, FailoverException
             {
-                AMQFrame queueBind =
-                        QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                                     arguments, // arguments
-                                                     exchangeName, // exchange
-                                                     false, // nowait
-                                                     queueName, // queue
-                                                     routingKey, // routingKey
-                                                     getTicket()); // ticket
-
-                getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
-
+                sendQueueBind(queueName,routingKey,arguments,exchangeName);
                 return null;
             }
         }, _connection).execute();
     }
 
+    public abstract void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+            final AMQShortString exchangeName) throws AMQException, FailoverException;
+
     /**
 
      * Closes the session.
@@ -525,20 +541,7 @@
 
                     try
                     {
-
-                        getProtocolHandler().closeSession(this);
-
-                        final AMQFrame frame =
-                                ChannelCloseBody.createAMQFrame(getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                                                0, // classId
-                                                                0, // methodId
-                                                                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                                                                new AMQShortString("JMS client closing channel")); // replyText
-
-                        getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
-
-                        // When control resumes at this point, a reply will have been received that
-                        // indicates the broker has closed the channel successfully.
+                        sendClose(timeout);
                     }
                     catch (AMQException e)
                     {
@@ -562,6 +565,8 @@
         }
     }
 
+    public abstract void sendClose(long timeout) throws AMQException, FailoverException;
+
     /**
      * Called when the server initiates the closure of the session unilaterally.
      *
@@ -620,10 +625,7 @@
             }
 
             // Commits outstanding messages sent and outstanding acknowledgements.
-            final AMQProtocolHandler handler = getProtocolHandler();
-
-            handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
-                              TxCommitOkBody.class);
+            sendCommit();
         }
         catch (AMQException e)
         {
@@ -635,6 +637,8 @@
         }
     }
 
+    public abstract void sendCommit() throws AMQException, FailoverException;
+
     public void confirmConsumerCancelled(AMQShortString consumerTag)
     {
 
@@ -968,24 +972,14 @@
         {
             public Object execute() throws AMQException, FailoverException
             {
-                AMQFrame queueDeclare =
-                        QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                                        null, // arguments
-                                                        autoDelete, // autoDelete
-                                                        durable, // durable
-                                                        exclusive, // exclusive
-                                                        false, // nowait
-                                                        false, // passive
-                                                        name, // queue
-                                                        getTicket()); // ticket
-
-                getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
-
+                sendCreateQueue(name, autoDelete,durable,exclusive);
                 return null;
             }
         }, _connection).execute();
     }
 
+    public abstract void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
+            final boolean exclusive)throws AMQException, FailoverException;
     /**
      * Creates a QueueReceiver
      *
@@ -1356,20 +1350,7 @@
                 _dispatcher.rollback();
             }
 
-            if (isStrictAMQP())
-            {
-                // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
-                _connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
-                                                                                            getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
-                _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
-            }
-            else
-            {
-
-                _connection.getProtocolHandler().syncWrite(
-                        BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
-                        , BasicRecoverOkBody.class);
-            }
+            sendRecover();
 
             if (!isSuspended)
             {
@@ -1386,6 +1367,8 @@
         }
     }
 
+    public abstract void sendRecover() throws AMQException, FailoverException;
+
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
 
@@ -1408,22 +1391,7 @@
 
     }
 
-    public void rejectMessage(long deliveryTag, boolean requeue)
-    {
-        if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Rejecting delivery tag:" + deliveryTag);
-            }
-
-            AMQFrame basicRejectBody =
-                    BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
-                                                   requeue);
-
-            _connection.getProtocolHandler().writeFrame(basicRejectBody);
-        }
-    }
+    public abstract void rejectMessage(long deliveryTag, boolean requeue);
 
     /**
      * Commits all messages done in this transaction and releases any locks currently held.
@@ -1458,8 +1426,7 @@
                     _dispatcher.rollback();
                 }
 
-                _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
-                                                                                         getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+                sendRollback();
 
                 if (!isSuspended)
                 {
@@ -1477,6 +1444,13 @@
         }
     }
 
+    public void sendRollback() throws AMQException, FailoverException
+    {
+        _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
+                getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+    }
+
     public void run()
     {
         throw new java.lang.UnsupportedOperationException();
@@ -1591,7 +1565,6 @@
 
                         AMQDestination amqd = (AMQDestination) destination;
 
-                        final AMQProtocolHandler protocolHandler = getProtocolHandler();
                         // TODO: Define selectors in AMQP
                         // TODO: construct the rawSelector from the selector string if rawSelector == null
                         final FieldTable ft = FieldTableFactory.newFieldTable();
@@ -1602,10 +1575,8 @@
                             ft.addAll(rawSelector);
                         }
 
-                        BasicMessageConsumer consumer =
-                                new BasicMessageConsumer(_channelId, _connection, amqd, messageSelector, noLocal,
-                                                         _messageFactoryRegistry, AMQSession.this, protocolHandler, ft, prefetchHigh, prefetchLow,
-                                                         exclusive, _acknowledgeMode, noConsume, autoClose);
+                        BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh,prefetchLow,
+                                noLocal,exclusive, messageSelector, ft, noConsume, autoClose);
 
                         if (_messageListener != null)
                         {
@@ -1653,6 +1624,10 @@
                 }, _connection).execute();
     }
 
+    public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+            final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+            final boolean noConsume, final boolean autoClose);
+
     /**
      * Called by the MessageConsumer when closing, to deregister the consumer from the map from consumerTag to consumer
      * instance.
@@ -1710,38 +1685,8 @@
      *
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
-            throws JMSException
-    {
-        try
-        {
-            AMQMethodEvent response =
-                    new FailoverRetrySupport<AMQMethodEvent, AMQException>(
-                            new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
-                            {
-                                public AMQMethodEvent execute() throws AMQException, FailoverException
-                                {
-                                    AMQFrame boundFrame =
-                                            ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(),
-                                                                             getProtocolMinorVersion(), exchangeName, // exchange
-                                                                             queueName, // queue
-                                                                             routingKey); // routingKey
-
-                                    return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
-
-                                }
-                            }, _connection).execute();
-
-            // Extract and return the response code from the query.
-            ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
-
-            return (responseBody.replyCode == 0);
-        }
-        catch (AMQException e)
-        {
-            throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
-        }
-    }
+    public abstract boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+            throws JMSException;
 
     /**
      * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after failover
@@ -2048,50 +1993,15 @@
     private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
                                   AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
-        // need to generate a consumer tag on the client so we can exploit the nowait flag
+        //need to generate a consumer tag on the client so we can exploit the nowait flag
         AMQShortString tag = new AMQShortString(Integer.toString(_nextTag++));
-
-        FieldTable arguments = FieldTableFactory.newFieldTable();
-        if ((messageSelector != null) && !messageSelector.equals(""))
-        {
-            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
-        }
-
-        if (consumer.isAutoClose())
-        {
-            arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
-        }
-
-        if (consumer.isNoConsume())
-        {
-            arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
-        }
-
         consumer.setConsumerTag(tag);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tag, consumer);
 
         try
         {
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            AMQFrame jmsConsume =
-                    BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
-                                                    tag, // consumerTag
-                                                    consumer.isExclusive(), // exclusive
-                                                    consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
-                                                    consumer.isNoLocal(), // noLocal
-                                                    nowait, // nowait
-                                                    queueName, // queue
-                                                    getTicket()); // ticket
-
-            if (nowait)
-            {
-                protocolHandler.writeFrame(jmsConsume);
-            }
-            else
-            {
-                protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
-            }
+            sendConsume(consumer,queueName,protocolHandler,nowait,messageSelector,tag);
         }
         catch (AMQException e)
         {
@@ -2101,6 +2011,9 @@
         }
     }
 
+    public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
+            AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector,AMQShortString tag) throws AMQException, FailoverException;
+
     private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
@@ -2117,9 +2030,8 @@
                     {
                         checkNotClosed();
                         long producerId = getNextProducerId();
-                        BasicMessageProducer producer =
-                                new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
-                                                         AMQSession.this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+                        BasicMessageProducer producer = createMessageProducer(destination, mandatory,
+                                immediate, waitUntilSent, producerId);
                         registerProducer(producerId, producer);
 
                         return producer;
@@ -2127,6 +2039,9 @@
                 }, _connection).execute();
     }
 
+    public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+            final boolean immediate, final boolean waitUntilSent, long producerId);
+
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
         declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
@@ -2147,31 +2062,22 @@
      * @todo Be aware of possible changes to parameter order as versions change.
      */
     private void declareExchange(final AMQShortString name, final AMQShortString type,
-                                 final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
+            final AMQProtocolHandler protocolHandler, final boolean nowait) throws AMQException
     {
         new FailoverNoopSupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>()
         {
             public Object execute() throws AMQException, FailoverException
             {
-                AMQFrame exchangeDeclare =
-                        ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                                           null, // arguments
-                                                           false, // autoDelete
-                                                           false, // durable
-                                                           name, // exchange
-                                                           false, // internal
-                                                           nowait, // nowait
-                                                           false, // passive
-                                                           getTicket(), // ticket
-                                                           type); // type
-
-                protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
-
+                sendExchangeDeclare(name, type, protocolHandler, nowait);
                 return null;
             }
         }, _connection).execute();
     }
 
+    public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
+            final boolean nowait) throws AMQException, FailoverException;
+
+
     /**
      * Declares a queue for a JMS destination.
      *
@@ -2208,24 +2114,15 @@
                             amqd.setQueueName(protocolHandler.generateQueueName());
                         }
 
-                        AMQFrame queueDeclare =
-                                QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                                                null, // arguments
-                                                                amqd.isAutoDelete(), // autoDelete
-                                                                amqd.isDurable(), // durable
-                                                                amqd.isExclusive(), // exclusive
-                                                                false, // nowait
-                                                                false, // passive
-                                                                amqd.getAMQQueueName(), // queue
-                                                                getTicket()); // ticket
-
-                        protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+                        sendQueueDeclare(amqd,protocolHandler);
 
                         return amqd.getAMQQueueName();
                     }
                 }, _connection).execute();
     }
 
+    public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler)throws AMQException, FailoverException;
+
     /**
      * Undeclares the specified queue.
      *
@@ -2245,16 +2142,7 @@
             {
                 public Object execute() throws AMQException, FailoverException
                 {
-                    AMQFrame queueDeleteFrame =
-                            QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                                           false, // ifEmpty
-                                                           false, // ifUnused
-                                                           true, // nowait
-                                                           queueName, // queue
-                                                           getTicket()); // ticket
-
-                    getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
-
+                    sendQueueDelete(queueName);
                     return null;
                 }
             }, _connection).execute();
@@ -2265,22 +2153,24 @@
         }
     }
 
+    public abstract void sendQueueDelete(final AMQShortString queueName)  throws AMQException, FailoverException;
+
     private long getNextProducerId()
     {
         return ++_nextProducerId;
     }
 
-    private AMQProtocolHandler getProtocolHandler()
+    protected AMQProtocolHandler getProtocolHandler()
     {
         return _connection.getProtocolHandler();
     }
 
-    private byte getProtocolMajorVersion()
+    protected byte getProtocolMajorVersion()
     {
         return getProtocolHandler().getProtocolMajorVersion();
     }
 
-    private byte getProtocolMinorVersion()
+    protected byte getProtocolMinorVersion()
     {
         return getProtocolHandler().getProtocolMinorVersion();
     }
@@ -2538,12 +2428,7 @@
                 }
 
                 _suspended = suspend;
-
-                AMQFrame channelFlowFrame =
-                        ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
-                                                       !suspend);
-
-                _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+                sendSuspendChannel(suspend);
             }
             catch (FailoverException e)
             {
@@ -2551,6 +2436,8 @@
             }
         }
     }
+
+    public abstract void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException;
 
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     private class Dispatcher extends Thread

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=573039&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Wed Sep  5 12:54:24 2007
@@ -0,0 +1,347 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicAckBody;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.framing.BasicRecoverOkBody;
+import org.apache.qpid.framing.BasicRejectBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ExchangeBoundBody;
+import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.TxCommitBody;
+import org.apache.qpid.framing.TxCommitOkBody;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQSession_0_8 extends AMQSession
+{
+
+    /** Used for debugging. */
+    private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
+
+    /**
+     * Creates a new session on a connection.
+     *
+     * @param con                     The connection on which to create the session.
+     * @param channelId               The unique identifier for the session.
+     * @param transacted              Indicates whether or not the session is transactional.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param messageFactoryRegistry  The message factory factory for the session.
+     * @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLowMark  The number of prefetched messages at which to resume the session.
+     */
+    AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+    {
+
+         super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
+    }
+
+    /**
+     * Creates a new session on a connection with the default message factory factory.
+     *
+     * @param con                     The connection on which to create the session.
+     * @param channelId               The unique identifier for the session.
+     * @param transacted              Indicates whether or not the session is transactional.
+     * @param acknowledgeMode         The acknoledgement mode for the session.
+     * @param defaultPrefetchHigh     The maximum number of messages to prefetched before suspending the session.
+     * @param defaultPrefetchLow      The number of prefetched messages at which to resume the session.
+     */
+    AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
+               int defaultPrefetchLow)
+    {
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
+             defaultPrefetchLow);
+    }
+
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+    {
+        final AMQFrame ackFrame =
+            BasicAckBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+                                        multiple);
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+        }
+
+        getProtocolHandler().writeFrame(ackFrame);
+    }
+
+    public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
+            final AMQShortString exchangeName) throws AMQException, FailoverException
+    {
+        AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+                exchangeName, // exchange
+                false, // nowait
+                queueName, // queue
+                routingKey, // routingKey
+                getTicket()); // ticket
+
+        getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+    }
+
+    public void sendClose(long timeout) throws AMQException, FailoverException
+    {
+        getProtocolHandler().closeSession(this);
+
+        final AMQFrame frame = ChannelCloseBody.createAMQFrame
+            (getChannelId(), getProtocolMajorVersion(), getProtocolMinorVersion(),
+             0, // classId
+             0, // methodId
+             AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+             new AMQShortString("JMS client closing channel")); // replyText
+
+        getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class, timeout);
+        // When control resumes at this point, a reply will have been received that
+        // indicates the broker has closed the channel successfully.
+    }
+
+    public void sendCommit() throws AMQException, FailoverException
+    {
+        final AMQProtocolHandler handler = getProtocolHandler();
+
+        handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxCommitOkBody.class);
+    }
+
+    public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
+            FailoverException
+    {
+        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
+                autoDelete, // autoDelete
+                durable, // durable
+                exclusive, // exclusive
+                false, // nowait
+                false, // passive
+                name, // queue
+                getTicket()); // ticket
+
+        getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+    }
+
+    public void sendRecover() throws AMQException, FailoverException
+    {
+        if (isStrictAMQP())
+        {
+            // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
+            _connection.getProtocolHandler().writeFrame(
+                    BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false)); // requeue
+            _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
+        }
+        else
+        {
+
+            _connection.getProtocolHandler().syncWrite(
+                    BasicRecoverBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false) // requeue
+                    , BasicRecoverOkBody.class);
+        }
+    }
+
+    public void rejectMessage(long deliveryTag, boolean requeue)
+    {
+        if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Rejecting delivery tag:" + deliveryTag);
+            }
+
+            AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), deliveryTag,
+                    requeue);
+
+            _connection.getProtocolHandler().writeFrame(basicRejectBody);
+        }
+    }
+
+    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
+            throws JMSException
+    {
+        try
+        {
+            AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>(
+                    new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
+                    {
+                        public AMQMethodEvent execute() throws AMQException, FailoverException
+                        {
+                            AMQFrame boundFrame = ExchangeBoundBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(),
+                                    exchangeName, // exchange
+                                    queueName, // queue
+                                    routingKey); // routingKey
+
+                            return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
+
+                        }
+                    }, _connection).execute();
+
+            // Extract and return the response code from the query.
+            ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
+
+            return (responseBody.replyCode == 0);
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
+        }
+    }
+
+    public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
+            String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+    {
+
+        FieldTable arguments = FieldTableFactory.newFieldTable();
+        if ((messageSelector != null) && !messageSelector.equals(""))
+        {
+            arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
+        }
+
+        if (consumer.isAutoClose())
+        {
+            arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+        }
+
+        if (consumer.isNoConsume())
+        {
+            arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+        }
+
+        consumer.setConsumerTag(tag);
+        // we must register the consumer in the map before we actually start listening
+        _consumers.put(tag, consumer);
+        // TODO: Be aware of possible changes to parameter order as versions change.
+        AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), arguments, // arguments
+                tag, // consumerTag
+                consumer.isExclusive(), // exclusive
+                consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE, // noAck
+                consumer.isNoLocal(), // noLocal
+                nowait, // nowait
+                queueName, // queue
+                getTicket()); // ticket
+
+        if (nowait)
+        {
+            protocolHandler.writeFrame(jmsConsume);
+        }
+        else
+        {
+            protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+        }
+    }
+
+    public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
+            final boolean nowait) throws AMQException, FailoverException
+    {
+        AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
+                false, // autoDelete
+                false, // durable
+                name, // exchange
+                false, // internal
+                nowait, // nowait
+                false, // passive
+                getTicket(), // ticket
+                type); // type
+
+        protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
+    }
+
+    public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler) throws AMQException, FailoverException
+    {
+        AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), null, // arguments
+                amqd.isAutoDelete(), // autoDelete
+                amqd.isDurable(), // durable
+                amqd.isExclusive(), // exclusive
+                false, // nowait
+                false, // passive
+                amqd.getAMQQueueName(), // queue
+                getTicket()); // ticket
+
+        protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
+    }
+
+    public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
+    {
+        AMQFrame queueDeleteFrame = QueueDeleteBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), false, // ifEmpty
+                false, // ifUnused
+                true, // nowait
+                queueName, // queue
+                getTicket()); // ticket
+
+        getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
+    }
+
+    public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
+    {
+        AMQFrame channelFlowFrame = ChannelFlowBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion(), !suspend);
+
+        _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+    }
+
+    public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+            final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
+            final boolean noConsume, final boolean autoClose)
+    {
+
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
+       return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
+                                 _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow,
+                                 exclusive, _acknowledgeMode, noConsume, autoClose);
+    }
+
+
+    public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+            final boolean immediate, final boolean waitUntilSent, long producerId)
+    {
+
+       return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
+                                 this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Wed Sep  5 12:54:24 2007
@@ -20,37 +20,31 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.failover.FailoverException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class BasicMessageConsumer extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
@@ -69,10 +63,10 @@
     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
     /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
-    private AMQShortString _consumerTag;
+    protected AMQShortString _consumerTag;
 
     /** We need to know the channel id when constructing frames */
-    private int _channelId;
+    protected int _channelId;
 
     /**
      * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -84,7 +78,7 @@
 
     private final AMQSession _session;
 
-    private AMQProtocolHandler _protocolHandler;
+    protected AMQProtocolHandler _protocolHandler;
 
     /** We need to store the "raw" field table so that we can resubscribe in the event of failover being required */
     private FieldTable _rawSelectorFieldTable;
@@ -482,29 +476,7 @@
                 if (sendClose)
                 {
                     // TODO: Be aware of possible changes to parameter order as versions change.
-                    final AMQFrame cancelFrame =
-                        BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                            _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
-                            false); // nowait
-
-                    try
-                    {
-                        _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
-
-                        if (_logger.isDebugEnabled())
-                        {
-                            _logger.debug("CancelOk'd for consumer:" + debugIdentity());
-                        }
-
-                    }
-                    catch (AMQException e)
-                    {
-                        throw new JMSAMQException("Error closing consumer: " + e, e);
-                    }
-                    catch (FailoverException e)
-                    {
-                        throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
-                    }
+                    sendCancel();
                 }
                 else
                 {
@@ -527,6 +499,8 @@
             }
         }
     }
+
+    public abstract void sendCancel() throws JMSAMQException;
 
     /**
      * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=573039&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Wed Sep  5 12:54:24 2007
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.FieldTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer
+{
+    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+
+    protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
+            String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
+            AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+            boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+    {
+        super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
+              protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
+              acknowledgeMode, noConsume, autoClose);
+    }
+
+    public void sendCancel() throws JMSAMQException
+    {
+        final AMQFrame cancelFrame =
+            BasicCancelBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                _protocolHandler.getProtocolMinorVersion(), _consumerTag, // consumerTag
+                false); // nowait
+
+        try
+        {
+            _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("CancelOk'd for consumer:" + debugIdentity());
+            }
+
+        }
+        catch (AMQException e)
+        {
+            throw new JMSAMQException("Error closing consumer: " + e, e);
+        }
+        catch (FailoverException e)
+        {
+            throw new JMSAMQException("FailoverException interrupted basic cancel.", e);
+        }
+    }
+}
\ No newline at end of file

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=573039&r1=573038&r2=573039&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Sep  5 12:54:24 2007
@@ -20,23 +20,8 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.mina.common.ByteBuffer;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.UnsupportedEncodingException;
+import java.util.UUID;
 
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
@@ -51,10 +36,15 @@
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
-import java.io.UnsupportedEncodingException;
-import java.util.UUID;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageConverter;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.ContentBody;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
+public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
@@ -63,7 +53,7 @@
     /**
      * If true, messages will not get a timestamp.
      */
-    private boolean _disableTimestamps;
+    protected boolean _disableTimestamps;
 
     /**
      * Priority of messages created by this producer.
@@ -95,14 +85,14 @@
      */
     private String _mimeType;
 
-    private AMQProtocolHandler _protocolHandler;
+    protected AMQProtocolHandler _protocolHandler;
 
     /**
      * True if this producer was created from a transacted session
      */
     private boolean _transacted;
 
-    private int _channelId;
+    protected int _channelId;
 
     /**
      * This is an id generated by the session and is used to tie individual producers to the session. This means we
@@ -115,7 +105,7 @@
     /**
      * The session used to create this producer
      */
-    private AMQSession _session;
+    protected AMQSession _session;
 
     private final boolean _immediate;
 
@@ -156,24 +146,7 @@
         }
     }
 
-    private void declareDestination(AMQDestination destination)
-    {
-        // Declare the exchange
-        // Note that the durable and internal arguments are ignored since passive is set to false
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame declare =
-            ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), null, // arguments
-                false, // autoDelete
-                false, // durable
-                destination.getExchangeName(), // exchange
-                false, // internal
-                true, // nowait
-                false, // passive
-                _session.getTicket(), // ticket
-                destination.getExchangeClass()); // type
-        _protocolHandler.writeFrame(declare);
-    }
+    public abstract void declareDestination(AMQDestination destination);
 
     public void setDisableMessageID(boolean b) throws JMSException
     {
@@ -485,81 +458,13 @@
 
         message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame publishFrame =
-            BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
-                immediate, // immediate
-                mandatory, // mandatory
-                destination.getRoutingKey(), // routingKey
-                _session.getTicket()); // ticket
-
-        message.prepareForSending();
-        ByteBuffer payload = message.getData();
-        BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
-
-        if (!_disableTimestamps)
-        {
-            final long currentTime = System.currentTimeMillis();
-            contentHeaderProperties.setTimestamp(currentTime);
-
-            if (timeToLive > 0)
-            {
-                contentHeaderProperties.setExpiration(currentTime + timeToLive);
-            }
-            else
-            {
-                contentHeaderProperties.setExpiration(0);
-            }
-        }
-
-        contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
-        contentHeaderProperties.setPriority((byte) priority);
-
-        final int size = (payload != null) ? payload.limit() : 0;
-        final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
-        final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
-
-        if (payload != null)
-        {
-            createContentBodies(payload, frames, 2, _channelId);
-        }
-
-        if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
-        {
-            _logger.debug("Sending content body frames to " + destination);
-        }
-
-        // weight argument of zero indicates no child content headers, just bodies
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        AMQFrame contentHeaderFrame =
-            ContentHeaderBody.createAMQFrame(_channelId,
-                BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
-                    _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Sending content header frame to " + destination);
-        }
-
-        frames[0] = publishFrame;
-        frames[1] = contentHeaderFrame;
-        CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-        _protocolHandler.writeFrame(compositeFrame, wait);
-
-        if (message != origMessage)
-        {
-            _logger.debug("Updating original message");
-            origMessage.setJMSPriority(message.getJMSPriority());
-            origMessage.setJMSTimestamp(message.getJMSTimestamp());
-            _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
-            origMessage.setJMSExpiration(message.getJMSExpiration());
-            origMessage.setJMSMessageID(message.getJMSMessageID());
-        }
+        sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive,
+                 mandatory, immediate, wait);
     }
 
+    public abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message, int deliveryMode,
+            int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait)throws JMSException;
+
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {
         if (destination instanceof TemporaryDestination)
@@ -578,60 +483,6 @@
                 throw new JMSException("Cannot send to a deleted temporary destination");
             }
         }
-    }
-
-    /**
-     * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
-     * maximum frame size.
-     *
-     * @param payload
-     * @param frames
-     * @param offset
-     * @param channelId @return the array of content bodies
-     */
-    private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
-    {
-
-        if (frames.length == (offset + 1))
-        {
-            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
-        }
-        else
-        {
-
-            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
-            long remaining = payload.remaining();
-            for (int i = offset; i < frames.length; i++)
-            {
-                payload.position((int) framePayloadMax * (i - offset));
-                int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
-                payload.limit(payload.position() + length);
-                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
-
-                remaining -= length;
-            }
-        }
-
-    }
-
-    private int calculateContentBodyFrameCount(ByteBuffer payload)
-    {
-        // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
-        // (0xCE byte).
-        int frameCount;
-        if ((payload == null) || (payload.remaining() == 0))
-        {
-            frameCount = 0;
-        }
-        else
-        {
-            int dataLength = payload.remaining();
-            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
-            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
-            frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
-        }
-
-        return frameCount;
     }
 
     public void setMimeType(String mimeType) throws JMSException

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=573039&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Wed Sep  5 12:54:24 2007
@@ -0,0 +1,199 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.CompositeAMQDataBlock;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+
+public class BasicMessageProducer_0_8 extends BasicMessageProducer
+{
+
+    BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+            AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+            boolean waitUntilSent)
+    {
+        super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory,waitUntilSent);
+    }
+
+    public void declareDestination(AMQDestination destination)
+    {
+        // Declare the exchange
+        // Note that the durable and internal arguments are ignored since passive is set to false
+        // TODO: Be aware of possible changes to parameter order as versions change.
+        AMQFrame declare =
+            ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                _protocolHandler.getProtocolMinorVersion(), null, // arguments
+                false, // autoDelete
+                false, // durable
+                destination.getExchangeName(), // exchange
+                false, // internal
+                true, // nowait
+                false, // passive
+                _session.getTicket(), // ticket
+                destination.getExchangeClass()); // type
+        _protocolHandler.writeFrame(declare);
+    }
+
+    public void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
+            int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+    {
+//      AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame publishFrame =
+            BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
+                immediate, // immediate
+                mandatory, // mandatory
+                destination.getRoutingKey(), // routingKey
+                _session.getTicket()); // ticket
+
+        message.prepareForSending();
+        ByteBuffer payload = message.getData();
+        BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+
+        if (!_disableTimestamps)
+        {
+            final long currentTime = System.currentTimeMillis();
+            contentHeaderProperties.setTimestamp(currentTime);
+
+            if (timeToLive > 0)
+            {
+                contentHeaderProperties.setExpiration(currentTime + timeToLive);
+            }
+            else
+            {
+                contentHeaderProperties.setExpiration(0);
+            }
+        }
+
+        contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
+        contentHeaderProperties.setPriority((byte) priority);
+
+        final int size = (payload != null) ? payload.limit() : 0;
+        final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
+        final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+
+        if (payload != null)
+        {
+            createContentBodies(payload, frames, 2, _channelId);
+        }
+
+        if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+        {
+            _logger.debug("Sending content body frames to " + destination);
+        }
+
+        // weight argument of zero indicates no child content headers, just bodies
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        AMQFrame contentHeaderFrame =
+            ContentHeaderBody.createAMQFrame(_channelId,
+                BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+                    _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Sending content header frame to " + destination);
+        }
+
+        frames[0] = publishFrame;
+        frames[1] = contentHeaderFrame;
+        CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+        _protocolHandler.writeFrame(compositeFrame, wait);
+
+        if (message != origMessage)
+        {
+            _logger.debug("Updating original message");
+            origMessage.setJMSPriority(message.getJMSPriority());
+            origMessage.setJMSTimestamp(message.getJMSTimestamp());
+            _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+            origMessage.setJMSExpiration(message.getJMSExpiration());
+            origMessage.setJMSMessageID(message.getJMSMessageID());
+        }
+    }
+
+    /**
+     * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
+     * maximum frame size.
+     *
+     * @param payload
+     * @param frames
+     * @param offset
+     * @param channelId @return the array of content bodies
+     */
+    private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
+    {
+
+        if (frames.length == (offset + 1))
+        {
+            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
+        }
+        else
+        {
+
+            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+            long remaining = payload.remaining();
+            for (int i = offset; i < frames.length; i++)
+            {
+                payload.position((int) framePayloadMax * (i - offset));
+                int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+                payload.limit(payload.position() + length);
+                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
+
+                remaining -= length;
+            }
+        }
+
+    }
+
+    private int calculateContentBodyFrameCount(ByteBuffer payload)
+    {
+        // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
+        // (0xCE byte).
+        int frameCount;
+        if ((payload == null) || (payload.remaining() == 0))
+        {
+            frameCount = 0;
+        }
+        else
+        {
+            int dataLength = payload.remaining();
+            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
+            frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
+        }
+
+        return frameCount;
+    }
+
+}