You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/08 20:31:19 UTC

svn commit: r684036 [1/2] - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/qpid/client/message/ client/src/main/java/org/apache/qpid/...

Author: rhs
Date: Fri Aug  8 11:31:18 2008
New Revision: 684036

URL: http://svn.apache.org/viewvc?rev=684036&view=rev
Log:
QPID-1213: simplified unprocessed message and moved version specific code into the _0_8 and _0_10 variants

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Aug  8 11:31:18 2008
@@ -63,7 +63,6 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -78,7 +77,6 @@
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.jms.Session;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.AMQBindingURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,20 +97,20 @@
  * @todo Two new objects created on every failover supported method call. Consider more efficient ways of doing this,
  * after looking at worse bottlenecks first.
  */
-public abstract class AMQSession extends Closeable implements Session, QueueSession, TopicSession
+public abstract class AMQSession<C extends BasicMessageConsumer, P extends BasicMessageProducer> extends Closeable implements Session, QueueSession, TopicSession
 {
 
 
-    public static final class IdToConsumerMap
+    public static final class IdToConsumerMap<C extends BasicMessageConsumer>
     {
         private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
-        private final ConcurrentHashMap<Integer, BasicMessageConsumer> _slowAccessConsumers = new ConcurrentHashMap<Integer, BasicMessageConsumer>();
+        private final ConcurrentHashMap<Integer, C> _slowAccessConsumers = new ConcurrentHashMap<Integer, C>();
 
-        public BasicMessageConsumer get(int id)
+        public C get(int id)
         {
             if ((id & 0xFFFFFFF0) == 0)
             {
-                return _fastAccessConsumers[id];
+                return (C) _fastAccessConsumers[id];
             }
             else
             {
@@ -120,12 +118,12 @@
             }
         }
 
-        public BasicMessageConsumer put(int id, BasicMessageConsumer consumer)
+        public C put(int id, C consumer)
         {
-            BasicMessageConsumer oldVal;
+            C oldVal;
             if ((id & 0xFFFFFFF0) == 0)
             {
-                oldVal = _fastAccessConsumers[id];
+                oldVal = (C) _fastAccessConsumers[id];
                 _fastAccessConsumers[id] = consumer;
             }
             else
@@ -137,12 +135,12 @@
 
         }
 
-        public BasicMessageConsumer remove(int id)
+        public C remove(int id)
         {
-            BasicMessageConsumer consumer;
+            C consumer;
             if ((id & 0xFFFFFFF0) == 0)
             {
-                consumer = _fastAccessConsumers[id];
+                consumer = (C) _fastAccessConsumers[id];
                 _fastAccessConsumers[id] = null;
             }
             else
@@ -154,15 +152,15 @@
 
         }
 
-        public Collection<BasicMessageConsumer> values()
+        public Collection<C> values()
         {
-            ArrayList<BasicMessageConsumer> values = new ArrayList<BasicMessageConsumer>();
+            ArrayList<C> values = new ArrayList<C>();
 
             for (int i = 0; i < 16; i++)
             {
                 if (_fastAccessConsumers[i] != null)
                 {
-                    values.add(_fastAccessConsumers[i]);
+                    values.add((C) _fastAccessConsumers[i]);
                 }
             }
             values.addAll(_slowAccessConsumers.values());
@@ -183,8 +181,6 @@
     /** Used for debugging. */
     private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
 
-    /** Used for debugging in the dispatcher. */
-    private static final Logger _dispatcherLogger = LoggerFactory.getLogger(Dispatcher.class);
 
     /** The default maximum number of prefetched message at which to suspend the channel. */
     public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
@@ -234,7 +230,6 @@
     /** Holds this session unique identifier, used to distinguish it from other sessions. */
     protected int _channelId;
 
-    /** @todo This does not appear to be set? */
     private int _ticket;
 
     /** Holds the high mark for prefetched message, at which the session is suspended. */
@@ -261,8 +256,8 @@
      * Holds a mapping from message consumers to their identifying names, so that their subscriptions may be looked
      * up in the {@link #_subscriptions} map.
      */
-    protected final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
-            new ConcurrentHashMap<BasicMessageConsumer, String>();
+    protected final ConcurrentHashMap<C, String> _reverseSubscriptionMap =
+            new ConcurrentHashMap<C, String>();
 
     /**
      * Used to hold incoming messages.
@@ -299,7 +294,7 @@
      * Maps from identifying tags to message consumers, in order to pass dispatch incoming messages to the right
      * consumer.
      */
-    protected final IdToConsumerMap _consumers = new IdToConsumerMap();
+    protected final IdToConsumerMap<C> _consumers = new IdToConsumerMap<C>();
 
     //Map<AMQShortString, BasicMessageConsumer> _consumers =
     //new ConcurrentHashMap<AMQShortString, BasicMessageConsumer>();
@@ -308,7 +303,7 @@
      * Contains a list of consumers which have been removed but which might still have
      * messages to acknowledge, eg in client ack or transacted modes
      */
-    private CopyOnWriteArrayList<BasicMessageConsumer> _removedConsumers = new CopyOnWriteArrayList<BasicMessageConsumer>();
+    private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>();
 
     /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
@@ -584,7 +579,7 @@
         }, _connection).execute();
     }
 
-    public void addBindingKey(BasicMessageConsumer consumer, AMQDestination amqd, String routingKey) throws AMQException
+    public void addBindingKey(C consumer, AMQDestination amqd, String routingKey) throws AMQException
     {
         if (consumer.getQueuename() != null)
         {
@@ -755,11 +750,12 @@
 
     public abstract void sendCommit() throws AMQException, FailoverException;
 
-    public void confirmConsumerCancelled(AMQShortString consumerTag)
+
+    public void confirmConsumerCancelled(int consumerTag)
     {
 
         // Remove the consumer from the map
-        BasicMessageConsumer consumer = _consumers.get(consumerTag.toIntValue());
+        C consumer = _consumers.get(consumerTag);
         if (consumer != null)
         {
             if (!consumer.isNoConsume())  // Normal Consumer
@@ -801,7 +797,7 @@
                     }
                     else
                     {
-                        _queue.add(new UnprocessedMessage.CloseConsumerMessage(consumer));
+                        _queue.add(new CloseConsumerMessage(consumer));
                     }
                 }
             }
@@ -848,7 +844,7 @@
                                   false, false);
     }
 
-    public MessageConsumer createExclusiveConsumer(Destination destination) throws JMSException
+    public C createExclusiveConsumer(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
 
@@ -927,8 +923,8 @@
             _subscriptions.get(name).close();
         }
         AMQTopic dest = AMQTopic.createDurableTopic((AMQTopic) topic, name, _connection);
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
-        TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
+        C consumer = (C) createConsumer(dest, messageSelector, noLocal);
+        TopicSubscriberAdaptor<C> subscriber = new TopicSubscriberAdaptor(dest, consumer);
         _subscriptions.put(name, subscriber);
         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);
 
@@ -960,23 +956,23 @@
         return msg;
     }
 
-    public BasicMessageProducer createProducer(Destination destination) throws JMSException
+    public P createProducer(Destination destination) throws JMSException
     {
         return createProducerImpl(destination, DEFAULT_MANDATORY, DEFAULT_IMMEDIATE);
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean immediate) throws JMSException
+    public P createProducer(Destination destination, boolean immediate) throws JMSException
     {
         return createProducerImpl(destination, DEFAULT_MANDATORY, immediate);
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate)
+    public P createProducer(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate);
     }
 
-    public BasicMessageProducer createProducer(Destination destination, boolean mandatory, boolean immediate,
+    public P createProducer(Destination destination, boolean mandatory, boolean immediate,
                                                boolean waitUntilSent) throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, waitUntilSent);
@@ -986,7 +982,7 @@
     {
         checkNotClosed();
 
-        return new TopicPublisherAdapter((BasicMessageProducer) createProducer(topic, false, false), topic);
+        return new TopicPublisherAdapter((P) createProducer(topic, false, false), topic);
     }
 
     public Queue createQueue(String queueName) throws JMSException
@@ -1074,7 +1070,7 @@
     {
         checkValidDestination(destination);
         AMQQueue dest = (AMQQueue) destination;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination);
+        C consumer = (C) createConsumer(destination);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1093,7 +1089,7 @@
     {
         checkValidDestination(destination);
         AMQQueue dest = (AMQQueue) destination;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(destination, messageSelector);
+        C consumer = (C) createConsumer(destination, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1111,7 +1107,7 @@
     {
         checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
+        C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1130,7 +1126,7 @@
     {
         checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
-        BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector);
+        C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1175,7 +1171,7 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
     }
 
     /**
@@ -1195,7 +1191,7 @@
         AMQTopic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
-        return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest, messageSelector, noLocal));
+        return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
     }
 
     public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
@@ -1369,17 +1365,8 @@
         {
             _logger.debug("Message[" + message.toString() + "] received in session");
         }
-
-        if (message instanceof ReturnMessage)
-        {
-            // Return of the bounced message.
-            returnBouncedMessage((ReturnMessage) message);
-        }
-        else
-        {
-            _highestDeliveryTag.set(message.getDeliveryTag());
-            _queue.add(message);
-        }
+        _highestDeliveryTag.set(message.getDeliveryTag());
+        _queue.add(message);
     }
 
     public void declareAndBind(AMQDestination amqd)
@@ -1618,7 +1605,7 @@
         }
     }
 
-    protected MessageConsumer createConsumerImpl(final Destination destination, final int prefetchHigh,
+    protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
                                                  final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
                                                  final boolean noConsume, final boolean autoClose) throws JMSException
     {
@@ -1642,10 +1629,10 @@
             messageSelector = selector;
         }
 
-        return new FailoverRetrySupport<MessageConsumer, JMSException>(
-                new FailoverProtectedOperation<MessageConsumer, JMSException>()
+        return new FailoverRetrySupport<C, JMSException>(
+                new FailoverProtectedOperation<C, JMSException>()
                 {
-                    public MessageConsumer execute() throws JMSException, FailoverException
+                    public C execute() throws JMSException, FailoverException
                     {
                         checkNotClosed();
 
@@ -1668,7 +1655,7 @@
                             ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
                         }
 
-                        BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
+                        C consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
                                                                               noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
 
                         if (_messageListener != null)
@@ -1712,7 +1699,7 @@
                 }, _connection).execute();
     }
 
-    public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+    public abstract C createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
                                                                final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
                                                                final boolean noConsume, final boolean autoClose) throws JMSException;
 
@@ -1722,9 +1709,9 @@
      *
      * @param consumer the consum
      */
-    void deregisterConsumer(BasicMessageConsumer consumer)
+    void deregisterConsumer(C consumer)
     {
-        if (_consumers.remove(consumer.getConsumerTag().toIntValue()) != null)
+        if (_consumers.remove(consumer.getConsumerTag()) != null)
         {
             String subscriptionName = _reverseSubscriptionMap.remove(consumer);
             if (subscriptionName != null)
@@ -2012,12 +1999,12 @@
     {
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
-        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+        final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
 
-        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+        final Iterator<C> it = clonedConsumers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageConsumer con = it.next();
+            final C con = it.next();
             if (error != null)
             {
                 con.notifyError(error);
@@ -2048,7 +2035,7 @@
         final Iterator it = clonedProducers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageProducer prod = (BasicMessageProducer) it.next();
+            final P prod = (P) it.next();
             prod.close();
         }
         // at this point the _producers map is empty
@@ -2096,20 +2083,18 @@
      *
      * @param queueName
      */
-    private void consumeFromQueue(BasicMessageConsumer consumer, AMQShortString queueName,
+    private void consumeFromQueue(C consumer, AMQShortString queueName,
                                   AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector) throws AMQException, FailoverException
     {
         int tagId = _nextTag++;
-        // need to generate a consumer tag on the client so we can exploit the nowait flag
-        AMQShortString tag = new AMQShortString(Integer.toString(tagId));
 
-        consumer.setConsumerTag(tag);
+        consumer.setConsumerTag(tagId);
         // we must register the consumer in the map before we actually start listening
         _consumers.put(tagId, consumer);
 
         try
         {
-            sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tag);
+            sendConsume(consumer, queueName, protocolHandler, nowait, messageSelector, tagId);
         }
         catch (AMQException e)
         {
@@ -2119,26 +2104,26 @@
         }
     }
 
-    public abstract void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName,
-                                     AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException;
+    public abstract void sendConsume(C consumer, AMQShortString queueName,
+                                     AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException;
 
-    private BasicMessageProducer createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
+    private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
         return createProducerImpl(destination, mandatory, immediate, false);
     }
 
-    private BasicMessageProducer createProducerImpl(final Destination destination, final boolean mandatory,
+    private P createProducerImpl(final Destination destination, final boolean mandatory,
                                                     final boolean immediate, final boolean waitUntilSent) throws JMSException
     {
-        return new FailoverRetrySupport<BasicMessageProducer, JMSException>(
-                new FailoverProtectedOperation<BasicMessageProducer, JMSException>()
+        return new FailoverRetrySupport<P, JMSException>(
+                new FailoverProtectedOperation<P, JMSException>()
                 {
-                    public BasicMessageProducer execute() throws JMSException, FailoverException
+                    public P execute() throws JMSException, FailoverException
                     {
                         checkNotClosed();
                         long producerId = getNextProducerId();
-                        BasicMessageProducer producer = createMessageProducer(destination, mandatory,
+                        P producer = createMessageProducer(destination, mandatory,
                                                                               immediate, waitUntilSent, producerId);
                         registerProducer(producerId, producer);
 
@@ -2147,7 +2132,7 @@
                 }, _connection).execute();
     }
 
-    public abstract BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+    public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
                                                                final boolean immediate, final boolean waitUntilSent, long producerId);
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
@@ -2320,12 +2305,12 @@
         }
         // we need to clone the list of consumers since the close() method updates the _consumers collection
         // which would result in a concurrent modification exception
-        final ArrayList<BasicMessageConsumer> clonedConsumers = new ArrayList<BasicMessageConsumer>(_consumers.values());
+        final ArrayList<C> clonedConsumers = new ArrayList<C>(_consumers.values());
 
-        final Iterator<BasicMessageConsumer> it = clonedConsumers.iterator();
+        final Iterator<C> it = clonedConsumers.iterator();
         while (it.hasNext())
         {
-            final BasicMessageConsumer con = it.next();
+            final C con = it.next();
             con.markClosed();
         }
         // at this point the _consumers map will be empty
@@ -2360,7 +2345,7 @@
      *
      * @throws AMQException
      */
-    private void registerConsumer(BasicMessageConsumer consumer, boolean nowait) throws AMQException // , FailoverException
+    private void registerConsumer(C consumer, boolean nowait) throws AMQException // , FailoverException
     {
         AMQDestination amqd = consumer.getDestination();
 
@@ -2424,15 +2409,16 @@
 
     private void rejectAllMessages(boolean requeue)
     {
-        rejectMessagesForConsumerTag(null, requeue);
+        rejectMessagesForConsumerTag(0, requeue, true);
     }
 
     /**
      * @param consumerTag The consumerTag to prune from queue or all if null
      * @param requeue     Should the removed messages be requeued (or discarded. Possibly to DLQ)
+     * @param rejectAllConsumers
      */
 
-    private void rejectMessagesForConsumerTag(AMQShortString consumerTag, boolean requeue)
+    private void rejectMessagesForConsumerTag(int consumerTag, boolean requeue, boolean rejectAllConsumers)
     {
         Iterator messages = _queue.iterator();
         if (_logger.isInfoEnabled())
@@ -2453,7 +2439,7 @@
         {
             UnprocessedMessage message = (UnprocessedMessage) messages.next();
 
-            if ((consumerTag == null) || message.getConsumerTag().equals(consumerTag.toString()))
+            if (rejectAllConsumers || (message.getConsumerTag() == consumerTag))
             {
                 if (_logger.isDebugEnabled())
                 {
@@ -2475,12 +2461,11 @@
 
     private void resubscribeConsumers() throws AMQException
     {
-        ArrayList consumers = new ArrayList(_consumers.values());
+        ArrayList<C> consumers = new ArrayList<C>(_consumers.values());
         _consumers.clear();
 
-        for (Iterator it = consumers.iterator(); it.hasNext();)
-        {
-            BasicMessageConsumer consumer = (BasicMessageConsumer) it.next();
+        for (C consumer : consumers)
+        {            
             consumer.failedOver();
             registerConsumer(consumer, true);
         }
@@ -2492,53 +2477,11 @@
         _logger.info(MessageFormat.format("Resubscribing producers = {0} producers.size={1}", producers, producers.size())); // FIXME: removeKey
         for (Iterator it = producers.iterator(); it.hasNext();)
         {
-            BasicMessageProducer producer = (BasicMessageProducer) it.next();
+            P producer = (P) it.next();
             producer.resubscribe();
         }
     }
 
-    private void returnBouncedMessage(final ReturnMessage msg)
-    {
-        _connection.performConnectionTask(new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    // Bounced message is processed here, away from the mina thread
-                    AbstractJMSMessage bouncedMessage =
-                            _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
-                                                                  msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
-                    AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
-                    AMQShortString reason = msg.getReplyText();
-                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
-
-                    // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
-                    if (errorCode == AMQConstant.NO_CONSUMERS)
-                    {
-                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
-                    }
-                    else if (errorCode == AMQConstant.NO_ROUTE)
-                    {
-                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
-                    }
-                    else
-                    {
-                        _connection.exceptionReceived(
-                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
-                    }
-
-                }
-                catch (Exception e)
-                {
-                    _logger.error(
-                            "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
-                            e);
-                }
-            }
-        });
-    }
-
     /**
      * Suspends or unsuspends this session.
      *
@@ -2641,6 +2584,10 @@
 
     }
 
+    /** Used for debugging in the dispatcher. */
+    private static final Logger _dispatcherLogger = LoggerFactory.getLogger("org.apache.qpid.client.AMQSession.Dispatcher");
+    
+
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
     class Dispatcher extends Thread
     {
@@ -2652,6 +2599,8 @@
         private final AtomicLong _rollbackMark = new AtomicLong(-1);
         private String dispatcherID = "" + System.identityHashCode(this);
 
+
+
         public Dispatcher()
         {
             super("Dispatcher-Channel-" + _channelId);
@@ -2670,7 +2619,7 @@
 
         }
 
-        public void rejectPending(BasicMessageConsumer consumer)
+        public void rejectPending(C consumer)
         {
             synchronized (_lock)
             {
@@ -2685,7 +2634,7 @@
                 consumer.rollbackPendingMessages();
 
                 // Reject messages on pre-dispatch queue
-                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, false);
                 //Let the dispatcher deal with this when it gets to them.
 
                 // closeConsumer
@@ -2712,7 +2661,7 @@
 
                 _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
-                for (BasicMessageConsumer consumer : _consumers.values())
+                for (C consumer : _consumers.values())
                 {
                     if (!consumer.isNoConsume())
                     {
@@ -2778,7 +2727,7 @@
                             _lock.wait();
                         }
 
-                        if (!(message instanceof UnprocessedMessage.CloseConsumerMessage)
+                        if (!(message instanceof CloseConsumerMessage)
                             && tagLE(deliveryTag, _rollbackMark.get()))
                         {
                             rejectMessage(message, true);
@@ -2840,8 +2789,8 @@
             //This if block is not needed anymore as bounce messages are handled separately
             //if (message.getDeliverBody() != null)
             //{
-            final BasicMessageConsumer consumer =
-                    _consumers.get(message.getConsumerTag().toIntValue());
+            final C consumer =
+                    _consumers.get(message.getConsumerTag());
 
             if ((consumer == null) || consumer.isClosed())
             {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Fri Aug  8 11:31:18 2008
@@ -52,7 +52,7 @@
 /**
  * This is a 0.10 Session
  */
-public class AMQSession_0_10 extends AMQSession
+public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, BasicMessageProducer_0_10>
 {
 
     /**
@@ -345,7 +345,7 @@
     /**
      * Create an 0_10 message consumer
      */
-    public BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
+    public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
                                                       final int prefetchLow, final boolean noLocal,
                                                       final boolean exclusive, String messageSelector,
                                                       final FieldTable ft, final boolean noConsume,
@@ -406,8 +406,8 @@
      * This method is invoked when a consumer is creted
      * Registers the consumer with the broker
      */
-    public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
-                            boolean nowait, String messageSelector, AMQShortString tag)
+    public void sendConsume(BasicMessageConsumer_0_10 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler,
+                            boolean nowait, String messageSelector, int tag)
             throws AMQException, FailoverException
     {
         boolean preAcquire;
@@ -416,10 +416,10 @@
             preAcquire = ( ! consumer.isNoConsume()  &&
                     (consumer.getMessageSelector() == null || consumer.getMessageSelector().equals("")) )
                     || !(consumer.getDestination() instanceof AMQQueue);
-            getQpidSession().messageSubscribe(queueName.toString(), tag.toString(),
+            getQpidSession().messageSubscribe(queueName.toString(), String.valueOf(tag),
                                               getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
                                               preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
-                                              new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
+                                              (BasicMessageConsumer_0_10) consumer, null,
                                               consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         }
         catch (JMSException e)
@@ -427,21 +427,23 @@
             throw new AMQException(AMQConstant.INTERNAL_ERROR, "problem when registering consumer", e);
         }
 
+        String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString();
+
         if (! prefetch())
         {
-            getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.CREDIT);
+            getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.CREDIT);
         }
         else
         {
-            getQpidSession().messageSetFlowMode(consumer.getConsumerTag().toString(), MessageFlowMode.WINDOW);
+            getQpidSession().messageSetFlowMode(consumerTag, MessageFlowMode.WINDOW);
         }
-        getQpidSession().messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+        getQpidSession().messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
         // only if not immediat prefetch
         if(prefetch() && (consumer.isStrated() || _immediatePrefetch))
         {
             // set the flow
-            getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+            getQpidSession().messageFlow(consumerTag,
                                          MessageCreditUnit.MESSAGE,
                                          getAMQConnection().getMaxPrefetch());
         }
@@ -452,7 +454,7 @@
     /**
      * Create an 0_10 message producer
      */
-    public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+    public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
                                                       final boolean immediate, final boolean waitUntilSent,
                                                       long producerId)
     {
@@ -542,13 +544,14 @@
         {
             for (BasicMessageConsumer consumer : _consumers.values())
             {
-                getQpidSession().messageStop(consumer.getConsumerTag().toString());
+                getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()));
             }
         }
         else
         {
-            for (BasicMessageConsumer consumer : _consumers.values())
+            for (BasicMessageConsumer_0_10 consumer : _consumers.values())
             {
+                String consumerTag = String.valueOf(consumer.getConsumerTag());
                 //only set if msg list is null
                 try
                 {
@@ -556,18 +559,18 @@
                     {
                         if (consumer.getMessageListener() != null)
                         {
-                            getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
+                            getQpidSession().messageFlow(consumerTag,
                                                          MessageCreditUnit.MESSAGE, 1);
                         }
                     }
                     else
                     {
                         getQpidSession()
-                            .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.MESSAGE,
+                            .messageFlow(consumerTag, MessageCreditUnit.MESSAGE,
                                          getAMQConnection().getMaxPrefetch());
                     }
                     getQpidSession()
-                        .messageFlow(consumer.getConsumerTag().toString(), MessageCreditUnit.BYTE, 0xFFFFFFFF);
+                        .messageFlow(consumerTag, MessageCreditUnit.BYTE, 0xFFFFFFFF);
                 }
                 catch (Exception e)
                 {
@@ -715,7 +718,7 @@
         AMQTopic origTopic=checkValidTopic(topic);
         AMQTopic dest=AMQTopic.createDurable010Topic(origTopic, name, _connection);
 
-        TopicSubscriberAdaptor subscriber=_subscriptions.get(name);
+        TopicSubscriberAdaptor<BasicMessageConsumer_0_10> subscriber=_subscriptions.get(name);
         if (subscriber != null)
         {
             if (subscriber.getTopic().equals(topic))
@@ -766,7 +769,7 @@
             }
         }
 
-        subscriber=new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createExclusiveConsumer(dest));
+        subscriber=new TopicSubscriberAdaptor(dest, createExclusiveConsumer(dest));
 
         _subscriptions.put(name, subscriber);
         _reverseSubscriptionMap.put(subscriber.getMessageConsumer(), name);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Fri Aug  8 11:31:18 2008
@@ -25,10 +25,11 @@
 import javax.jms.IllegalStateException;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
@@ -43,7 +44,7 @@
 
 import java.util.Map;
 
-public final class AMQSession_0_8 extends AMQSession
+public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
 
     /** Used for debugging. */
@@ -218,6 +219,7 @@
         return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
     }
 
+
     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
             throws JMSException
     {
@@ -245,10 +247,14 @@
         {
             throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
         }
-    }
+    }    
 
-    public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
-            String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+    @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+                                      AMQShortString queueName,
+                                      AMQProtocolHandler protocolHandler,
+                                      boolean nowait,
+                                      String messageSelector,
+                                      int tag) throws AMQException, FailoverException
     {
         FieldTable arguments = FieldTableFactory.newFieldTable();
         if ((messageSelector != null) && !messageSelector.equals(""))
@@ -268,7 +274,7 @@
 
         BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
                                                                            queueName,
-                                                                           tag,
+                                                                           new AMQShortString(String.valueOf(tag)),
                                                                            consumer.isNoLocal(),
                                                                            consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
                                                                            consumer.isExclusive(),
@@ -337,7 +343,7 @@
     }
 
 
-    public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+    public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
             final boolean immediate, final boolean waitUntilSent, long producerId)
     {
 
@@ -345,6 +351,66 @@
                                  this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
     }
 
+
+    @Override public void messageReceived(UnprocessedMessage message)
+    {
+
+        if (message instanceof ReturnMessage)
+        {
+            // Return of the bounced message.
+            returnBouncedMessage((ReturnMessage) message);
+        }
+        else
+        {
+            super.messageReceived(message);
+        }
+    }
+
+    private void returnBouncedMessage(final ReturnMessage msg)
+    {
+        _connection.performConnectionTask(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    // Bounced message is processed here, away from the mina thread
+                    AbstractJMSMessage bouncedMessage =
+                            _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+                                                                  msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+                    AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+                    AMQShortString reason = msg.getReplyText();
+                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+                    // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+                    if (errorCode == AMQConstant.NO_CONSUMERS)
+                    {
+                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+                    }
+                    else if (errorCode == AMQConstant.NO_ROUTE)
+                    {
+                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+                    }
+                    else
+                    {
+                        _connection.exceptionReceived(
+                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+                    }
+
+                }
+                catch (Exception e)
+                {
+                    _logger.error(
+                            "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+                            e);
+                }
+            }
+        });
+    }
+
+
+
+
     public void sendRollback() throws AMQException, FailoverException
     {
         TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
@@ -365,7 +431,7 @@
         checkNotClosed();
         AMQTopic origTopic = checkValidTopic(topic);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
-        TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+        TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
         if (subscriber != null)
         {
             if (subscriber.getTopic().equals(topic))

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Aug  8 11:31:18 2008
@@ -22,10 +22,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.MessageConsumer;
@@ -49,7 +46,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
@@ -72,7 +69,7 @@
     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
     /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
-    protected AMQShortString _consumerTag;
+    protected int _consumerTag;
 
     /** We need to know the channel id when constructing frames */
     protected final int _channelId;
@@ -517,7 +514,7 @@
 
             throw e;
         }
-        else if (o instanceof UnprocessedMessage.CloseConsumerMessage)
+        else if (o instanceof CloseConsumerMessage)
         {
             _closed.set(true);
             deregisterConsumer();
@@ -635,7 +632,7 @@
      * @param closeMessage
      *            this message signals that we should close the browser
      */
-    public void notifyCloseMessage(UnprocessedMessage.CloseConsumerMessage closeMessage)
+    public void notifyCloseMessage(CloseConsumerMessage closeMessage)
     {
         if (isMessageListenerSet())
         {
@@ -667,26 +664,21 @@
      *
      * @param messageFrame the raw unprocessed mesage
      */
-    void notifyMessage(UnprocessedMessage messageFrame)
+    void notifyMessage(U messageFrame)
     {
-        if (messageFrame instanceof UnprocessedMessage.CloseConsumerMessage)
+        if (messageFrame instanceof CloseConsumerMessage)
         {
-            notifyCloseMessage((UnprocessedMessage.CloseConsumerMessage) messageFrame);
+            notifyCloseMessage((CloseConsumerMessage) messageFrame);
             return;
         }
 
-        final boolean debug = _logger.isDebugEnabled();
 
-        if (debug)
-        {
-            _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag());
-        }
 
         try
         {
             AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
 
-            if (debug)
+            if (_logger.isDebugEnabled())
             {
                 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
             }
@@ -721,7 +713,7 @@
         }
     }
 
-    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<H, B> messageFrame)
+    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
             throws Exception;
 
     /** @param jmsMessage this message has already been processed so can't redo preDeliver */
@@ -936,12 +928,12 @@
         _session.deregisterConsumer(this);
     }
 
-    public AMQShortString getConsumerTag()
+    public int getConsumerTag()
     {
         return _consumerTag;
     }
 
-    public void setConsumerTag(AMQShortString consumerTag)
+    public void setConsumerTag(int consumerTag)
     {
         _consumerTag = consumerTag;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Aug  8 11:31:18 2008
@@ -22,11 +22,8 @@
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.api.Message;
 import org.apache.qpid.transport.*;
 import org.apache.qpid.QpidException;
 import org.apache.qpid.filter.MessageFilter;
@@ -35,16 +32,14 @@
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This is a 0.10 message consumer.
  */
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
-        implements org.apache.qpid.nclient.util.MessageListener
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
+        implements org.apache.qpid.nclient.MessagePartListener
 {
 
     /**
@@ -76,6 +71,7 @@
      * Specify whether this consumer is performing a sync receive
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+    private String _consumerTagString;
 
     //--- constructor
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
@@ -106,6 +102,19 @@
         _isStarted = connection.started();
     }
 
+
+    @Override public void setConsumerTag(int consumerTag)
+    {
+        super.setConsumerTag(consumerTag);
+        _consumerTagString = String.valueOf(consumerTag);
+    }
+
+    public String getConsumerTagString()
+    {
+        return _consumerTagString;
+    }
+
+
     // ----- Interface org.apache.qpid.client.util.MessageListener
 
     /**
@@ -142,7 +151,7 @@
         {
             if (isMessageListenerSet() && ! getSession().prefetch())
             {
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                           MessageCreditUnit.MESSAGE, 1);
             }
             _logger.debug("messageOk, trying to notify");
@@ -155,52 +164,19 @@
     /**
      * This method is invoked by the transport layer when a message is delivered for this
      * consumer. The message is transformed and pass to the session.
-     * @param message an 0.10 message
+     * @param xfr an 0.10 message transfer
      */
-    public void onMessage(Message message)
+    public void messageTransfer(MessageTransfer xfr)
+
+    //public void onMessage(Message message)
     {
         int channelId = getSession().getChannelId();
-        long deliveryId = message.getMessageTransferId();
-        AMQShortString consumerTag = getConsumerTag();
-        AMQShortString exchange;
-        AMQShortString routingKey;
-        boolean redelivered = false;
-        Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
-        if (headers[0] == null) {
-            headers[0] = new MessageProperties(); 
-        }
-        if( message.getDeliveryProperties() != null )
-        {
-            exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
-            routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
-            redelivered = message.getDeliveryProperties().getRedelivered();
-        }
-        else
-        {
-            exchange = new AMQShortString("");
-            routingKey = new AMQShortString("");
-            headers[1] = new DeliveryProperties();
-        }
+        int consumerTag = getConsumerTag();
+
         UnprocessedMessage_0_10 newMessage =
-                new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
-        try
-        {
-            newMessage.receiveBody(message.readData());
-        }
-        catch (IOException e)
-        {
-            getSession().getAMQConnection().exceptionReceived(e);
-        }
-        // if there is a replyto destination then we need to request the exchange info
-        ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo();
-        if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals(""))
-        {
-            // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-            // the exchnage class will be set later from within the sesion thread
-            String replyToUrl =  replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey();
-            newMessage.setReplyToURL(replyToUrl);
-        }
-        newMessage.setContentHeader(headers);
+            new UnprocessedMessage_0_10(consumerTag, xfr);
+
+
         getSession().messageReceived(newMessage);
         // else ignore this message
     }
@@ -213,47 +189,16 @@
      */
     @Override void sendCancel() throws AMQException
     {
-        ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
+        ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
         ((AMQSession_0_10) getSession()).getQpidSession().sync();
         // confirm cancel
         getSession().confirmConsumerCancelled(getConsumerTag());
         ((AMQSession_0_10) getSession()).getCurrentException();
     }
 
-    @Override void notifyMessage(UnprocessedMessage messageFrame)
+    @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
     {
-        // if there is a replyto destination then we need to request the exchange info
-        String replyToURL = messageFrame.getReplyToURL();
-        if (replyToURL != null && !replyToURL.equals(""))
-        {
-            AMQShortString  shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/')));
-            String replyToUrl = "://" + replyToURL;
-            if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
-            }
-            else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
-            }
-            else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
-            }
-            else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
-            }
-            else
-            {
-                Future<ExchangeQueryResult> future =
-                        ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
-                ExchangeQueryResult res = future.get();
-                // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-                replyToUrl = res.getType() + replyToUrl;
-            }
-            ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
-        }
+
         super.notifyMessage(messageFrame);
     }
 
@@ -267,11 +212,10 @@
     }
 
     @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
-            AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+            AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
     {
-        return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
-                messageFrame.getContentHeader(), messageFrame.getBodies()
-        );
+        AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
+        return _messageFactory.createMessage(msg.getMessageTransfer());
     }
 
     // private methods
@@ -327,7 +271,7 @@
             // and messages are not prefetched we then need to request another one
             if(! getSession().prefetch())
             {
-               _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+               _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                          MessageCreditUnit.MESSAGE, 1);
             }
         }
@@ -415,7 +359,7 @@
         super.setMessageListener(messageListener);
         if (messageListener != null && ! getSession().prefetch())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1);
         }
         if (messageListener != null && !_synchronousQueue.isEmpty())
@@ -440,7 +384,7 @@
         _isStarted = true;
         if (_syncReceive.get())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1);
         }
     }
@@ -463,7 +407,7 @@
     {
         if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1);
         }
         if (! getSession().prefetch())
@@ -486,4 +430,5 @@
           _session.acknowledgeMessage(msg.getDeliveryTag(), false);                
         }               
     }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Fri Aug  8 11:31:18 2008
@@ -26,22 +26,14 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.QpidException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.filter.JMSSelectorFilter;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody>
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
 {
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
@@ -69,7 +61,7 @@
 
     void sendCancel() throws AMQException, FailoverException
     {
-        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
+        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
 
         final AMQFrame cancelFrame = body.generateFrame(_channelId);
 
@@ -81,7 +73,7 @@
         }
     }
 
-     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
      {
 
         return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
@@ -90,4 +82,4 @@
 
     }
 
-}
\ No newline at end of file
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Fri Aug  8 11:31:18 2008
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -36,11 +37,8 @@
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.nclient.util.ByteBufferMessage;
 import org.apache.qpid.njms.ExceptionHelper;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageDeliveryMode;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.*;
+import static org.apache.qpid.transport.Option.*;
 
 /**
  * This is a 0_10 message producer.
@@ -76,21 +74,8 @@
 
         AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
 
-        org.apache.qpid.api.Message underlyingMessage = message.get010Message();
-        if (underlyingMessage == null)
-        {
-            underlyingMessage = new ByteBufferMessage(delegate.getMessageProperties(), delegate.getDeliveryProperties()); 
-            message.set010Message(underlyingMessage);
-
-        }
-        // force a rebuild of the 0-10 message if data has changed
-        if (message.getData() == null)
-        {
-            message.dataChanged();
-        }
-
-        DeliveryProperties deliveryProp = underlyingMessage.getDeliveryProperties();
-        MessageProperties messageProps = underlyingMessage.getMessageProperties();
+        DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
+        MessageProperties messageProps = delegate.getMessageProperties();
 
         if (messageId != null)
         {
@@ -140,10 +125,10 @@
             deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
             message.setJMSPriority(priority);
         }
-        String excahngeName = destination.getExchangeName().toString();
-        if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(excahngeName))
+        String exchangeName = destination.getExchangeName().toString();
+        if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
         {
-            deliveryProp.setExchange(excahngeName);
+            deliveryProp.setExchange(exchangeName);
         }
         String routingKey = destination.getRoutingKey().toString();
         if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
@@ -153,63 +138,27 @@
 
         messageProps.setContentLength(message.getContentLength());
 
-
-        /*    String replyToURL = contentHeaderProperties.getReplyToAsString();
-            if (replyToURL != null)
-            {
-                if(_logger.isDebugEnabled())
-                {
-                    StringBuffer b = new StringBuffer();
-                    b.append("\n==========================");
-                    b.append("\nReplyTo : " + replyToURL);
-                    b.append("\n==========================");
-                    _logger.debug(b.toString());
-                }
-                AMQBindingURL dest;
-                try
-                {
-                    dest = new AMQBindingURL(replyToURL);
-                }
-                catch (URISyntaxException e)
-                {
-                    throw ExceptionHelper.convertQpidExceptionToJMSException(e);
-                }
-                messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
-            }
-*/
-
-
         // send the message
         try
         {
-            org.apache.qpid.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession();
+            org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session)
+                ((AMQSession_0_10) getSession()).getQpidSession();
 
             // if true, we need to sync the delivery of this message
             boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
                             getSession().getAMQConnection().getSyncPersistence());
 
+            org.apache.mina.common.ByteBuffer data = message.getData();
+            ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
+            
+            ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+                                MessageAcquireMode.PRE_ACQUIRED,
+                                new Header(deliveryProp, messageProps),
+                    buffer, sync ? SYNC : NONE);
             if (sync)
             {
-                ssn.setAutoSync(true);
-            }
-            try
-            {
-                ssn.messageTransfer(destination.getExchangeName().toString(),
-                                    underlyingMessage,
-                                    ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
-                                    ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+                ssn.sync();
             }
-            finally
-            {
-                if (sync)
-                {
-                    ssn.setAutoSync(false);
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
         catch (RuntimeException rte)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Fri Aug  8 11:31:18 2008
@@ -32,20 +32,20 @@
  * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
  *
  */
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber
 {
     private final Topic _topic;
-    private final BasicMessageConsumer _consumer;
+    private final C _consumer;
     private final boolean _noLocal;
 
-    TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
+    TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal)
     {
         _topic = topic;
         _consumer = consumer;
         _noLocal = noLocal;
     }
     
-    TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
+    TopicSubscriberAdaptor(Topic topic, C consumer)
     {
         this(topic, consumer, consumer.isNoLocal());
     }
@@ -103,7 +103,7 @@
     }
     
     private void checkPreConditions() throws javax.jms.IllegalStateException{
-    	BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+    	C msgConsumer = _consumer;
     	
     	if (msgConsumer.isClosed() ){
 			throw new javax.jms.IllegalStateException("Consumer is closed");
@@ -120,7 +120,7 @@
 		}
 	}
 
-    BasicMessageConsumer getMessageConsumer()
+    C getMessageConsumer()
     {
         return _consumer;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Fri Aug  8 11:31:18 2008
@@ -43,13 +43,12 @@
             throws AMQException
     {
         final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
-                channelId,
                 body.getDeliveryTag(),
-                body.getConsumerTag(),
+                body.getConsumerTag().toIntValue(),
                 body.getExchange(),
                 body.getRoutingKey(),
                 body.getRedelivered());
         _logger.debug("New JmsDeliver method received:" + session);
-        session.unprocessedMessageReceived(msg);
+        session.unprocessedMessageReceived(channelId, msg);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Fri Aug  8 11:31:18 2008
@@ -45,14 +45,14 @@
     throws AMQException
     {
         _logger.debug("New JmsBounce method received");
-        final ReturnMessage msg = new ReturnMessage(channelId,
+        final ReturnMessage msg = new ReturnMessage(
                 body.getExchange(),
                 body.getRoutingKey(),
                 body.getReplyText(),
                 body.getReplyCode()
         );
 
-        session.unprocessedMessageReceived(msg);
+        session.unprocessedMessageReceived(channelId, msg);
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Fri Aug  8 11:31:18 2008
@@ -22,19 +22,14 @@
 package org.apache.qpid.client.message;
 
 import org.apache.commons.collections.map.ReferenceMap;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.JMSAMQException;
+import org.apache.qpid.client.*;
 import org.apache.qpid.framing.ContentHeaderProperties;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.nclient.*;
 import org.apache.qpid.jms.Message;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.AMQBindingURL;
@@ -47,8 +42,10 @@
 import javax.jms.MessageFormatException;
 import javax.jms.DeliveryMode;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
+import org.apache.qpid.exchange.ExchangeDefaults;
 
 public class AMQMessageDelegate_0_10 implements AMQMessageDelegate
 {
@@ -68,27 +65,32 @@
     private AMQSession _session;
     private final long _deliveryTag;
 
-    private static Map<String,Integer> _exchangeTypeMap = new HashMap<String, Integer>();
+    private static Map<AMQShortString,Integer> _exchangeTypeMap = new ConcurrentHashMap<AMQShortString, Integer>();
+    private static Map<String,Integer> _exchangeTypeStringMap = new ConcurrentHashMap<String, Integer>();
+    private static Map<String, Integer> _exchangeTypeToDestinationType = new ConcurrentHashMap<String, Integer>();;
 
     static
     {
-        // TODO - XXX - Need to add to this map when we find an exchange we don't know about
+        _exchangeTypeMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AMQDestination.QUEUE_TYPE);
+        _exchangeTypeMap.put(AMQShortString.EMPTY_STRING, AMQDestination.QUEUE_TYPE);
+        _exchangeTypeMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
+        _exchangeTypeMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AMQDestination.TOPIC_TYPE);
+
+        _exchangeTypeStringMap.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME.toString(), AMQDestination.QUEUE_TYPE);
+        _exchangeTypeStringMap.put("", AMQDestination.QUEUE_TYPE);
+        _exchangeTypeStringMap.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+        _exchangeTypeStringMap.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
 
-        _exchangeTypeMap.put(null, AMQDestination.QUEUE_TYPE);
-        _exchangeTypeMap.put("amq.direct", AMQDestination.QUEUE_TYPE);
-        _exchangeTypeMap.put("", AMQDestination.QUEUE_TYPE);
-        _exchangeTypeMap.put("amq.topic", AMQDestination.TOPIC_TYPE);
-        _exchangeTypeMap.put("amq.fanout", AMQDestination.TOPIC_TYPE);
 
-        
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.DIRECT_EXCHANGE_CLASS.toString(), AMQDestination.QUEUE_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
+        _exchangeTypeToDestinationType.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME.toString(), AMQDestination.TOPIC_TYPE);
     }
 
     protected AMQMessageDelegate_0_10()
     {
         this(new MessageProperties(), new DeliveryProperties(), -1);
         _readableProperties = false;
-
-
     }
 
     protected AMQMessageDelegate_0_10(long deliveryTag, MessageProperties messageProps, DeliveryProperties deliveryProps, AMQShortString exchange,
@@ -99,13 +101,68 @@
 
         AMQDestination dest;
 
-        dest = new AMQUndefinedDestination(exchange, routingKey, null);
-
-        // Destination dest = AMQDestination.createDestination(url);
+        dest = generateDestination(exchange, routingKey);
         setJMSDestination(dest);
+    }
+
+    private AMQDestination generateDestination(AMQShortString exchange, AMQShortString routingKey)
+    {
+        AMQDestination dest;
+        switch(getExchangeType(exchange))
+        {
+            case AMQDestination.QUEUE_TYPE:
+                dest = new AMQQueue(exchange, routingKey, routingKey);
+                break;
+            case  AMQDestination.TOPIC_TYPE:
+                dest = new AMQTopic(exchange, routingKey, null);
+                break;
+            default:
+                dest = new AMQUndefinedDestination(exchange, routingKey, null);
+
+        }
+
+        return dest;
+    }
+
+    private int getExchangeType(AMQShortString exchange)
+    {
+        Integer type = _exchangeTypeMap.get(exchange == null ? AMQShortString.EMPTY_STRING : exchange);
+
+        if(type == null)
+        {
+            return AMQDestination.UNKNOWN_TYPE;
+        }
+
+
+        return type;
+    }
 
 
+    public static void updateExchangeTypeMapping(Header header, org.apache.qpid.nclient.Session session)
+    {
+        DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
+        if(deliveryProps != null)
+        {
+            String exchange = deliveryProps.getExchange();
+
+            if(exchange != null && !_exchangeTypeStringMap.containsKey(exchange))
+            {
+
+                AMQShortString exchangeShortString = new AMQShortString(exchange);
+                Future<ExchangeQueryResult> future =
+                                session.exchangeQuery(exchange.toString());
+                ExchangeQueryResult res = future.get();
 
+                Integer type = _exchangeTypeToDestinationType.get(res.getType());
+                if(type == null)
+                {
+                    type = AMQDestination.UNKNOWN_TYPE;
+                }
+                _exchangeTypeStringMap.put(exchange, type);
+                _exchangeTypeMap.put(exchangeShortString, type);
+
+            }
+        }
     }
 
     protected AMQMessageDelegate_0_10(MessageProperties messageProps, DeliveryProperties deliveryProps, long deliveryTag)
@@ -212,19 +269,10 @@
                 String exchange = replyTo.getExchange();
                 String routingKey = replyTo.getRoutingKey();
 
-                int type = _exchangeTypeMap.get(exchange);
+                dest = generateDestination(exchange == null ? null : new AMQShortString(exchange),
+                        routingKey == null ? null : new AMQShortString(routingKey));
+
 
-                switch(type)
-                {
-                    case AMQDestination.QUEUE_TYPE:
-                        dest = new AMQQueue(new AMQShortString(exchange), new AMQShortString(routingKey), new AMQShortString(routingKey));
-                        break;
-                    case  AMQDestination.TOPIC_TYPE:
-                        dest = new AMQTopic(new AMQShortString(exchange), new AMQShortString(routingKey), null);
-                        break;
-                    default:
-                        dest = new AMQUndefinedDestination(new AMQShortString(exchange), new AMQShortString(routingKey), null);
-                }
 
 
 
@@ -897,4 +945,5 @@
     {
         return _deliveryProps;
     }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Fri Aug  8 11:31:18 2008
@@ -44,17 +44,11 @@
     protected boolean _readableMessage = false;
     protected boolean _changedData = true;
 
-
-    /**
-     * This is 0_10 specific
-     */
-    private org.apache.qpid.api.Message _010message = null;
     /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
 
 
 
 
-
     private AMQMessageDelegate _delegate;
     private boolean _redelivered;
 
@@ -454,52 +448,10 @@
         else
         {
             _data.flip();
-            dataChanged();
             _changedData = false;
         }
     }
 
-    public void set010Message(org.apache.qpid.api.Message m )
-    {
-        _010message = m;
-    }
-
-    public void dataChanged()
-    {
-        if (_010message != null)
-        {
-            _010message.clearData();
-            try
-            {
-                if (_data != null)
-                {
-                    _010message.appendData(_data.buf().slice());
-                }
-                else
-                {
-                    _010message.appendData(java.nio.ByteBuffer.allocate(0));
-                }
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    /**
-     * End 010 specific
-     */
-
-    public org.apache.qpid.api.Message get010Message()
-    {
-        return _010message;
-    }
-
-
-
-
-
     public int getContentLength()
     {
         if(_data != null)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Fri Aug  8 11:31:18 2008
@@ -110,17 +110,17 @@
 
 
     protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
-                                                          List bodies) throws AMQException
+                                                          java.nio.ByteBuffer body) throws AMQException
     {
         ByteBuffer data;
         final boolean debug = _logger.isDebugEnabled();
 
-        // we optimise the non-fragmented case to avoid copying
-        if ((bodies != null))
+
+        if (body != null)
         {
-            data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+            data = ByteBuffer.wrap(body);
         }
-        else // bodies == null
+        else // body == null
         {
             data = ByteBuffer.allocate(0);
         }
@@ -164,11 +164,11 @@
     }
 
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
-                                            List bodies)
+                                            java.nio.ByteBuffer body)
             throws JMSException, AMQException
     {
         final AbstractJMSMessage msg =
-                create010MessageWithBody(messageNbr, contentHeader, bodies);
+                create010MessageWithBody(messageNbr, contentHeader, body);
         msg.setJMSRedelivered(redelivered);
         msg.receivedFromServer();
         return msg;

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java?rev=684036&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java Fri Aug  8 11:31:18 2008
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import org.apache.qpid.client.BasicMessageConsumer;
+
+public final class CloseConsumerMessage extends UnprocessedMessage
+{
+
+    public CloseConsumerMessage(BasicMessageConsumer consumer)
+    {
+        super(consumer.getConsumerTag());
+    }
+
+
+    public long getDeliveryTag()
+    {
+        return 0;
+    }
+
+    public boolean isRedelivered()
+    {
+        return false;
+    }
+}