You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [12/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker...

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Aug 14 20:40:49 2008
@@ -25,10 +25,12 @@
 import javax.jms.IllegalStateException;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
 import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.common.AMQPFilterTypes;
@@ -40,7 +42,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public final class AMQSession_0_8 extends AMQSession
+import java.util.Map;
+
+public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
 
     /** Used for debugging. */
@@ -125,10 +129,19 @@
         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
     }
 
-    public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive) throws AMQException,
+    public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
             FailoverException
     {
-        QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,null);
+        FieldTable table = null;
+        if(arguments != null && !arguments.isEmpty())
+        {
+            table = new FieldTable();
+            for(Map.Entry<String, Object> entry : arguments.entrySet())
+            {
+                table.setObject(entry.getKey(), entry.getValue());
+            }
+        }
+        QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
         AMQFrame queueDeclare = body.generateFrame(_channelId);
         getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
@@ -206,6 +219,7 @@
         return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
     }
 
+
     public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
             throws JMSException
     {
@@ -233,10 +247,14 @@
         {
             throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
         }
-    }
+    }    
 
-    public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait,
-            String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+    @Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
+                                      AMQShortString queueName,
+                                      AMQProtocolHandler protocolHandler,
+                                      boolean nowait,
+                                      String messageSelector,
+                                      int tag) throws AMQException, FailoverException
     {
         FieldTable arguments = FieldTableFactory.newFieldTable();
         if ((messageSelector != null) && !messageSelector.equals(""))
@@ -256,7 +274,7 @@
 
         BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
                                                                            queueName,
-                                                                           tag,
+                                                                           new AMQShortString(String.valueOf(tag)),
                                                                            consumer.isNoLocal(),
                                                                            consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
                                                                            consumer.isExclusive(),
@@ -314,18 +332,18 @@
     }
 
     public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
-            final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
+            final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
             final boolean noConsume, final boolean autoClose)  throws JMSException
     {
 
         final AMQProtocolHandler protocolHandler = getProtocolHandler();
        return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
-                                 _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow,
+                                 _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
                                  exclusive, _acknowledgeMode, noConsume, autoClose);
     }
 
 
-    public BasicMessageProducer createMessageProducer(final Destination destination, final boolean mandatory,
+    public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
             final boolean immediate, final boolean waitUntilSent, long producerId)
     {
 
@@ -333,6 +351,66 @@
                                  this, getProtocolHandler(), producerId, immediate, mandatory, waitUntilSent);
     }
 
+
+    @Override public void messageReceived(UnprocessedMessage message)
+    {
+
+        if (message instanceof ReturnMessage)
+        {
+            // Return of the bounced message.
+            returnBouncedMessage((ReturnMessage) message);
+        }
+        else
+        {
+            super.messageReceived(message);
+        }
+    }
+
+    private void returnBouncedMessage(final ReturnMessage msg)
+    {
+        _connection.performConnectionTask(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    // Bounced message is processed here, away from the mina thread
+                    AbstractJMSMessage bouncedMessage =
+                            _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
+                                                                  msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
+                    AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
+                    AMQShortString reason = msg.getReplyText();
+                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
+
+                    // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
+                    if (errorCode == AMQConstant.NO_CONSUMERS)
+                    {
+                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+                    }
+                    else if (errorCode == AMQConstant.NO_ROUTE)
+                    {
+                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+                    }
+                    else
+                    {
+                        _connection.exceptionReceived(
+                                new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
+                    }
+
+                }
+                catch (Exception e)
+                {
+                    _logger.error(
+                            "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
+                            e);
+                }
+            }
+        });
+    }
+
+
+
+
     public void sendRollback() throws AMQException, FailoverException
     {
         TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
@@ -353,7 +431,7 @@
         checkNotClosed();
         AMQTopic origTopic = checkValidTopic(topic);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
-        TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
+        TopicSubscriberAdaptor<BasicMessageConsumer_0_8> subscriber = _subscriptions.get(name);
         if (subscriber != null)
         {
             if (subscriber.getTopic().equals(topic))
@@ -412,6 +490,28 @@
         return subscriber;
     }
 
+
+
+
+    public void setPrefecthLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
+    {
+        new FailoverRetrySupport<Object, AMQException>(
+                new FailoverProtectedOperation<Object, AMQException>()
+                {
+                    public Object execute() throws AMQException, FailoverException
+                    {
+
+                        BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
+
+                        // todo send low water mark when protocol allows.
+                        // todo Be aware of possible changes to parameter order as versions change.
+                        getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
+                  
+                        return null;
+                    }
+                 }, _connection).execute();
+    }
+
     class QueueDeclareOkHandler extends SpecificMethodFrameListener
     {
 
@@ -437,7 +537,7 @@
 
     }
 
-    Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+    protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
     {
         AMQFrame queueDeclare =
             getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -449,18 +549,23 @@
                                                        false,
                                                        null).generateFrame(_channelId);
         QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
-        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
+        getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);        
         return okHandler._messageCount;
     }
 
-    final boolean tagLE(long tag1, long tag2)
+    protected final boolean tagLE(long tag1, long tag2)
     {
         return tag1 <= tag2;
     }
 
-    final boolean updateRollbackMark(long currentMark, long deliveryTag)
+    protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
     {
         return false;
     }
 
+    public AMQMessageDelegateFactory getMessageDelegateFactory()
+    {
+        return AMQMessageDelegateFactory.FACTORY_0_8;
+    }
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Aug 14 20:40:49 2008
@@ -22,9 +22,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.MessageConsumer;
@@ -48,7 +46,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-public abstract class BasicMessageConsumer<H, B> extends Closeable implements MessageConsumer
+public abstract class BasicMessageConsumer<U> extends Closeable implements MessageConsumer
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
@@ -71,7 +69,7 @@
     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
     /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
-    protected AMQShortString _consumerTag;
+    protected int _consumerTag;
 
     /** We need to know the channel id when constructing frames */
     protected final int _channelId;
@@ -91,7 +89,7 @@
     /**
      * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
      */
-    private final FieldTable _rawSelectorFieldTable;
+    private final FieldTable _arguments;
 
     /**
      * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
@@ -168,7 +166,7 @@
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
                                    String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                    AMQSession session, AMQProtocolHandler protocolHandler,
-                                   FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                   FieldTable arguments, int prefetchHigh, int prefetchLow,
                                    boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
     {
         _channelId = channelId;
@@ -179,7 +177,7 @@
         _messageFactory = messageFactory;
         _session = session;
         _protocolHandler = protocolHandler;
-        _rawSelectorFieldTable = rawSelectorFieldTable;
+        _arguments = arguments;
         _prefetchHigh = prefetchHigh;
         _prefetchLow = prefetchLow;
         _exclusive = exclusive;
@@ -277,6 +275,14 @@
                     _messageListener.set(messageListener);
                     _session.setHasMessageListeners();
                     _session.startDispatcherIfNecessary();
+                    
+                    // If we already have messages on the queue, deliver them to the listener
+                    Object o = _synchronousQueue.poll();
+                    while (o != null)
+                    {
+                        messageListener.onMessage((Message) o);
+                        o = _synchronousQueue.poll();
+                    }
                 }
             }
         }
@@ -335,9 +341,9 @@
         _receivingThread = null;
     }
 
-    public FieldTable getRawSelectorFieldTable()
+    public FieldTable getArguments()
     {
-        return _rawSelectorFieldTable;
+        return _arguments;
     }
 
     public int getPrefetch()
@@ -508,6 +514,12 @@
 
             throw e;
         }
+        else if (o instanceof CloseConsumerMessage)
+        {
+            _closed.set(true);
+            deregisterConsumer();
+            return null;
+        }
         else
         {
             return (AbstractJMSMessage) o;
@@ -562,6 +574,7 @@
             }
             else
             {
+            	// FIXME: wow this is ugly
                 // //fixme this probably is not right
                 // if (!isNoConsume())
                 { // done in BasicCancelOK Handler but not sending one so just deregister.
@@ -606,7 +619,8 @@
                 }
                 else
                 {
-                    _closedStack = Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+                	StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+                    _closedStack = Arrays.asList(stackTrace).subList(3, stackTrace.length - 1);
                 }
             }
         }
@@ -615,25 +629,56 @@
     }
 
     /**
+     * @param closeMessage
+     *            this message signals that we should close the browser
+     */
+    public void notifyCloseMessage(CloseConsumerMessage closeMessage)
+    {
+        if (isMessageListenerSet())
+        {
+            // Currently only possible to get this msg type with a browser.
+            // If we get the message here then we should probably just close
+            // this consumer.
+            // Though an AutoClose consumer with message listener is quite odd..
+            // Just log out the fact so we know where we are
+            _logger.warn("Using an AutoCloseconsumer with message listener is not supported.");
+        }
+        else
+        {
+            try
+            {
+                _synchronousQueue.put(closeMessage);
+            }
+            catch (InterruptedException e)
+            {
+                _logger.info(" SynchronousQueue.put interupted. Usually result of connection closing,"
+                        + "but we shouldn't have close yet");
+            }
+        }
+    }
+
+    
+    /**
      * Called from the AMQSession when a message has arrived for this consumer. This methods handles both the case of a
      * message listener or a synchronous receive() caller.
      *
      * @param messageFrame the raw unprocessed mesage
      */
-    void notifyMessage(UnprocessedMessage messageFrame)
+    void notifyMessage(U messageFrame)
     {
-        final boolean debug = _logger.isDebugEnabled();
-
-        if (debug)
+        if (messageFrame instanceof CloseConsumerMessage)
         {
-            _logger.debug("notifyMessage called with message number " + messageFrame.getDeliveryTag());
+            notifyCloseMessage((CloseConsumerMessage) messageFrame);
+            return;
         }
 
+
+
         try
         {
-            AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
+            AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(_session.getMessageDelegateFactory(), messageFrame);
 
-            if (debug)
+            if (_logger.isDebugEnabled())
             {
                 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
             }
@@ -668,7 +713,7 @@
         }
     }
 
-    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<H, B> messageFrame)
+    public abstract AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, U messageFrame)
             throws Exception;
 
     /** @param jmsMessage this message has already been processed so can't redo preDeliver */
@@ -678,18 +723,9 @@
         {
             if (isMessageListenerSet())
             {
-                // we do not need a lock around the test above, and the dispatch below as it is invalid
-                // for an application to alter an installed listener while the session is started
-                // synchronized (_closed)
-                {
-                    // if (!_closed.get())
-                    {
-
-                        preApplicationProcessing(jmsMessage);
-                        getMessageListener().onMessage(jmsMessage);
-                        postDeliver(jmsMessage);
-                    }
-                }
+                preApplicationProcessing(jmsMessage);
+                getMessageListener().onMessage(jmsMessage);
+                postDeliver(jmsMessage);
             }
             else
             {
@@ -750,7 +786,7 @@
                 {
                     _session.acknowledgeMessage(msg.getDeliveryTag(), false);
                 }
-
+                _session.markDirty();
                 break;
 
             case Session.DUPS_OK_ACKNOWLEDGE:
@@ -892,12 +928,12 @@
         _session.deregisterConsumer(this);
     }
 
-    public AMQShortString getConsumerTag()
+    public int getConsumerTag()
     {
         return _consumerTag;
     }
 
-    public void setConsumerTag(AMQShortString consumerTag)
+    public void setConsumerTag(int consumerTag)
     {
         _consumerTag = consumerTag;
     }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Aug 14 20:40:49 2008
@@ -22,31 +22,24 @@
 import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.jms.*;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpidity.api.Message;
-import org.apache.qpidity.transport.*;
-import org.apache.qpidity.transport.Session;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.filter.JMSSelectorFilter;
+import org.apache.qpid.transport.*;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.filter.MessageFilter;
+import org.apache.qpid.filter.JMSSelectorFilter;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * This is a 0.10 message consumer.
  */
-public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
-        implements org.apache.qpidity.nclient.util.MessageListener
+public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedMessage_0_10>
+        implements org.apache.qpid.nclient.MessagePartListener
 {
 
     /**
@@ -78,17 +71,18 @@
      * Specify whether this consumer is performing a sync receive
      */
     private final AtomicBoolean _syncReceive = new AtomicBoolean(false);
+    private String _consumerTagString;
 
     //--- constructor
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
                                         AMQSession session, AMQProtocolHandler protocolHandler,
-                                        FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+                                        FieldTable arguments, int prefetchHigh, int prefetchLow,
                                         boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
             throws JMSException
     {
         super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
-              rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+                arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
         _0_10session = (AMQSession_0_10) session;
         if (messageSelector != null && !messageSelector.equals(""))
         {
@@ -108,7 +102,20 @@
         _isStarted = connection.started();
     }
 
-    // ----- Interface org.apache.qpidity.client.util.MessageListener
+
+    @Override public void setConsumerTag(int consumerTag)
+    {
+        super.setConsumerTag(consumerTag);
+        _consumerTagString = String.valueOf(consumerTag);
+    }
+
+    public String getConsumerTagString()
+    {
+        return _consumerTagString;
+    }
+
+
+    // ----- Interface org.apache.qpid.client.util.MessageListener
 
     /**
      *
@@ -144,7 +151,7 @@
         {
             if (isMessageListenerSet() && ! getSession().prefetch())
             {
-                _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+                _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                           MessageCreditUnit.MESSAGE, 1);
             }
             _logger.debug("messageOk, trying to notify");
@@ -157,52 +164,19 @@
     /**
      * This method is invoked by the transport layer when a message is delivered for this
      * consumer. The message is transformed and pass to the session.
-     * @param message an 0.10 message
+     * @param xfr an 0.10 message transfer
      */
-    public void onMessage(Message message)
+    public void messageTransfer(MessageTransfer xfr)
+
+    //public void onMessage(Message message)
     {
         int channelId = getSession().getChannelId();
-        long deliveryId = message.getMessageTransferId();
-        AMQShortString consumerTag = getConsumerTag();
-        AMQShortString exchange;
-        AMQShortString routingKey;
-        boolean redelivered = false;
-        Struct[] headers = {message.getMessageProperties(), message.getDeliveryProperties()};
-        if (headers[0] == null) {
-            headers[0] = new MessageProperties(); 
-        }
-        if( message.getDeliveryProperties() != null )
-        {
-            exchange = new AMQShortString(message.getDeliveryProperties().getExchange());
-            routingKey = new AMQShortString(message.getDeliveryProperties().getRoutingKey());
-            redelivered = message.getDeliveryProperties().getRedelivered();
-        }
-        else
-        {
-            exchange = new AMQShortString("");
-            routingKey = new AMQShortString("");
-            headers[1] = new DeliveryProperties();
-        }
+        int consumerTag = getConsumerTag();
+
         UnprocessedMessage_0_10 newMessage =
-                new UnprocessedMessage_0_10(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
-        try
-        {
-            newMessage.receiveBody(message.readData());
-        }
-        catch (IOException e)
-        {
-            getSession().getAMQConnection().exceptionReceived(e);
-        }
-        // if there is a replyto destination then we need to request the exchange info
-        ReplyTo replyTo = ((MessageProperties) headers[0]).getReplyTo();
-        if (replyTo != null && replyTo.getExchange() != null && !replyTo.getExchange().equals(""))
-        {
-            // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-            // the exchnage class will be set later from within the sesion thread
-            String replyToUrl =  replyTo.getExchange() + "/" + replyTo.getRoutingKey() + "/" + replyTo.getRoutingKey();
-            newMessage.setReplyToURL(replyToUrl);
-        }
-        newMessage.setContentHeader(headers);
+            new UnprocessedMessage_0_10(consumerTag, xfr);
+
+
         getSession().messageReceived(newMessage);
         // else ignore this message
     }
@@ -215,47 +189,16 @@
      */
     @Override void sendCancel() throws AMQException
     {
-        ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTag().toString());
+        ((AMQSession_0_10) getSession()).getQpidSession().messageCancel(getConsumerTagString());
         ((AMQSession_0_10) getSession()).getQpidSession().sync();
         // confirm cancel
         getSession().confirmConsumerCancelled(getConsumerTag());
         ((AMQSession_0_10) getSession()).getCurrentException();
     }
 
-    @Override void notifyMessage(UnprocessedMessage messageFrame)
+    @Override void notifyMessage(UnprocessedMessage_0_10 messageFrame)
     {
-        // if there is a replyto destination then we need to request the exchange info
-        String replyToURL = messageFrame.getReplyToURL();
-        if (replyToURL != null && !replyToURL.equals(""))
-        {
-            AMQShortString  shortExchangeName = new AMQShortString( replyToURL.substring(0, replyToURL.indexOf('/')));
-            String replyToUrl = "://" + replyToURL;
-            if (shortExchangeName.equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.TOPIC_EXCHANGE_CLASS + replyToUrl;
-            }
-            else if (shortExchangeName.equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.DIRECT_EXCHANGE_CLASS + replyToUrl;
-            }
-            else if (shortExchangeName.equals(ExchangeDefaults.HEADERS_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.HEADERS_EXCHANGE_CLASS + replyToUrl;
-            }
-            else if (shortExchangeName.equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
-            {
-                replyToUrl = ExchangeDefaults.FANOUT_EXCHANGE_CLASS + replyToUrl;
-            }
-            else
-            {
-                Future<ExchangeQueryResult> future =
-                        ((AMQSession_0_10) getSession()).getQpidSession().exchangeQuery(shortExchangeName.toString());
-                ExchangeQueryResult res = future.get();
-                // <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
-                replyToUrl = res.getType() + replyToUrl;
-            }
-            ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
-        }
+
         super.notifyMessage(messageFrame);
     }
 
@@ -269,12 +212,10 @@
     }
 
     @Override public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
-            UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws Exception
+            AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
     {
-        return _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
-                                             messageFrame.getExchange(), messageFrame.getRoutingKey(),
-                                             messageFrame.getContentHeader(), messageFrame.getBodies(),
-                                             messageFrame.getReplyToURL());
+        AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
+        return _messageFactory.createMessage(msg.getMessageTransfer());
     }
 
     // private methods
@@ -330,7 +271,7 @@
             // and messages are not prefetched we then need to request another one
             if(! getSession().prefetch())
             {
-               _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+               _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                          MessageCreditUnit.MESSAGE, 1);
             }
         }
@@ -418,7 +359,7 @@
         super.setMessageListener(messageListener);
         if (messageListener != null && ! getSession().prefetch())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1);
         }
         if (messageListener != null && !_synchronousQueue.isEmpty())
@@ -443,7 +384,7 @@
         _isStarted = true;
         if (_syncReceive.get())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1);
         }
     }
@@ -466,7 +407,7 @@
     {
         if (isStrated() && ! getSession().prefetch() && _synchronousQueue.isEmpty())
         {
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+            _0_10session.getQpidSession().messageFlow(getConsumerTagString(),
                                                       MessageCreditUnit.MESSAGE, 1);
         }
         if (! getSession().prefetch())
@@ -489,4 +430,5 @@
           _session.acknowledgeMessage(msg.getDeliveryTag(), false);                
         }               
     }
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Thu Aug 14 20:40:49 2008
@@ -20,40 +20,48 @@
  */
 package org.apache.qpid.client;
 
-import java.util.concurrent.TimeUnit;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.QpidException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
-import org.apache.qpid.client.message.MessageFactoryRegistry;
-import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.*;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.filter.JMSSelectorFilter;
+import org.apache.qpid.framing.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeaderBody,ContentBody>
+public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
 {
     protected final Logger _logger = LoggerFactory.getLogger(getClass());
 
     protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
             String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
-            AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
-            boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+            AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+            boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
     {
         super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
-              protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
+              protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
               acknowledgeMode, noConsume, autoClose);
+        try
+        {
+            
+            if (messageSelector != null && messageSelector.length() > 0)
+            {
+                JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
+            }
+        }
+        catch (QpidException e)
+        {
+            throw new InvalidSelectorException("cannot create consumer because of selector issue");
+        }
     }
 
     void sendCancel() throws AMQException, FailoverException
     {
-        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(_consumerTag, false);
+        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
 
         final AMQFrame cancelFrame = body.generateFrame(_channelId);
 
@@ -65,7 +73,7 @@
         }
     }
 
-     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(UnprocessedMessage<ContentHeaderBody, ContentBody> messageFrame)throws Exception
+     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
      {
 
         return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
@@ -74,4 +82,4 @@
 
     }
 
-}
\ No newline at end of file
+}

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Aug 14 20:40:49 2008
@@ -31,23 +31,16 @@
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.ObjectMessage;
-import javax.jms.Queue;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
-import javax.jms.Topic;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
 import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.util.UUIDGen;
+import org.apache.qpid.util.UUIDs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,6 +115,8 @@
 
     private boolean _disableMessageId;
 
+    private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
+
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
@@ -365,27 +360,27 @@
 
             if (message instanceof BytesMessage)
             {
-                newMessage = new MessageConverter((BytesMessage) message).getConvertedMessage();
+                newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
             }
             else if (message instanceof MapMessage)
             {
-                newMessage = new MessageConverter((MapMessage) message).getConvertedMessage();
+                newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
             }
             else if (message instanceof ObjectMessage)
             {
-                newMessage = new MessageConverter((ObjectMessage) message).getConvertedMessage();
+                newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
             }
             else if (message instanceof TextMessage)
             {
-                newMessage = new MessageConverter((TextMessage) message).getConvertedMessage();
+                newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
             }
             else if (message instanceof StreamMessage)
             {
-                newMessage = new MessageConverter((StreamMessage) message).getConvertedMessage();
+                newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
             }
             else
             {
-                newMessage = new MessageConverter(message).getConvertedMessage();
+                newMessage = new MessageConverter(_session, message).getConvertedMessage();
             }
 
             if (newMessage != null)
@@ -453,19 +448,18 @@
             }
         }
 
+        UUID messageId = null;
         if (_disableMessageId)
         {
-            message.setJMSMessageID(null);
+            message.setJMSMessageID((UUID)null);
         }
         else
         {
-            StringBuilder b = new StringBuilder(39);
-            b.append("ID:");
-            b.append(UUID.randomUUID());
-            message.setJMSMessageID(b.toString());
+            messageId = _messageIdGenerator.generate();
+            message.setJMSMessageID(messageId);
         }
 
-        sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+        sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
 
         if (message != origMessage)
         {
@@ -484,8 +478,8 @@
     }
 
     abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
-                              int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                              boolean immediate, boolean wait)throws JMSException;
+                              UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+                              boolean immediate, boolean wait) throws JMSException;
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Thu Aug 14 20:40:49 2008
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -29,17 +30,15 @@
 
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpidity.nclient.util.ByteBufferMessage;
-import org.apache.qpidity.njms.ExceptionHelper;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.MessageDeliveryMode;
-import org.apache.qpidity.transport.MessageDeliveryPriority;
-import org.apache.qpidity.transport.MessageProperties;
-import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpid.nclient.util.ByteBufferMessage;
+import org.apache.qpid.njms.ExceptionHelper;
+import org.apache.qpid.transport.*;
+import static org.apache.qpid.transport.Option.*;
 
 /**
  * This is a 0_10 message producer.
@@ -68,23 +67,25 @@
      * Sends a message to a given destination
      */
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
-                     int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
-                     boolean wait) throws JMSException
+                     UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+                     boolean immediate, boolean wait) throws JMSException
     {
         message.prepareForSending();
-        if (message.get010Message() == null)
+
+        AMQMessageDelegate_0_10 delegate = (AMQMessageDelegate_0_10) message.getDelegate();
+
+        DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
+        MessageProperties messageProps = delegate.getMessageProperties();
+
+        if (messageId != null)
         {
-            message.set010Message(new ByteBufferMessage());
+            messageProps.setMessageId(messageId);
         }
-        // force a rebuild of the 0-10 message if data has changed
-        if (message.getData() == null)
+        else if (messageProps.hasMessageId())
         {
-            message.dataChanged();
+            messageProps.clearMessageId();
         }
 
-        DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
-        MessageProperties messageProps = message.get010Message().getMessageProperties();
-        // set the delivery properties
         if (!_disableTimestamps)
         {
             final long currentTime = System.currentTimeMillis();
@@ -124,10 +125,10 @@
             deliveryProp.setPriority(MessageDeliveryPriority.get((short) priority));
             message.setJMSPriority(priority);
         }
-        String excahngeName = destination.getExchangeName().toString();
-        if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(excahngeName))
+        String exchangeName = destination.getExchangeName().toString();
+        if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(exchangeName))
         {
-            deliveryProp.setExchange(excahngeName);
+            deliveryProp.setExchange(exchangeName);
         }
         String routingKey = destination.getRoutingKey().toString();
         if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
@@ -135,105 +136,29 @@
             deliveryProp.setRoutingKey(routingKey);
         }
 
-        BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
-        if (contentHeaderProperties.reset())
-        {
-            // set the application properties
-            messageProps.setContentType(contentHeaderProperties.getContentType().toString());
-            messageProps.setContentLength(message.getContentLength());
-
-            // XXX: fixme
-            String mid = message.getJMSMessageID();
-            if( mid != null )
-            {
-                messageProps.setMessageId(UUID.fromString(mid.substring(3)));
-            }
-
-            AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
-            if (correlationID != null)
-            {
-                messageProps.setCorrelationId(correlationID.getBytes());
-            }
-
-            String replyToURL = contentHeaderProperties.getReplyToAsString();
-            if (replyToURL != null)
-            {
-                if(_logger.isDebugEnabled())
-                {
-                    StringBuffer b = new StringBuffer();
-                    b.append("\n==========================");
-                    b.append("\nReplyTo : " + replyToURL);
-                    b.append("\n==========================");
-                    _logger.debug(b.toString());
-                }
-                AMQBindingURL dest;
-                try
-                {
-                    dest = new AMQBindingURL(replyToURL);
-                }
-                catch (URISyntaxException e)
-                {
-                    throw ExceptionHelper.convertQpidExceptionToJMSException(e);
-                }
-                messageProps.setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
-            }
-
-            Map<String,Object> map = null;
-
-            if (contentHeaderProperties.getHeaders() != null)
-            {
-                //JMS_QPID_DESTTYPE   is always set but useles so this is a temporary fix
-                contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
-                map = FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders());
-            }
-
-            AMQShortString type = contentHeaderProperties.getType();
-            if (type != null)
-            {
-                if (map == null)
-                {
-                    map = new HashMap<String,Object>();
-                }
-                map.put(AbstractJMSMessage.JMS_TYPE, type.toString());
-            }
-
-            if (map != null)
-            {
-                messageProps.setApplicationHeaders(map);
-            }
-        }
+        messageProps.setContentLength(message.getContentLength());
 
         // send the message
         try
         {
-            org.apache.qpidity.nclient.Session ssn = ((AMQSession_0_10) getSession()).getQpidSession();
+            org.apache.qpid.transport.Session ssn = (org.apache.qpid.transport.Session)
+                ((AMQSession_0_10) getSession()).getQpidSession();
 
             // if true, we need to sync the delivery of this message
             boolean sync = (deliveryMode == DeliveryMode.PERSISTENT &&
                             getSession().getAMQConnection().getSyncPersistence());
 
+            org.apache.mina.common.ByteBuffer data = message.getData();
+            ByteBuffer buffer = data == null ? ByteBuffer.allocate(0) : data.buf().slice();
+            
+            ssn.messageTransfer(destination.getExchangeName().toString(), MessageAcceptMode.NONE,
+                                MessageAcquireMode.PRE_ACQUIRED,
+                                new Header(deliveryProp, messageProps),
+                    buffer, sync ? SYNC : NONE);
             if (sync)
             {
-                ssn.setAutoSync(true);
-            }
-            try
-            {
-                ssn.messageTransfer(destination.getExchangeName().toString(),
-                                    message.get010Message(),
-                                    ssn.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
-                                    ssn.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+                ssn.sync();
             }
-            finally
-            {
-                if (sync)
-                {
-                    ssn.setAutoSync(false);
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            throw ExceptionHelper.convertQpidExceptionToJMSException(e);
         }
         catch (RuntimeException rte)
         {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Thu Aug 14 20:40:49 2008
@@ -20,11 +20,15 @@
  */
 package org.apache.qpid.client;
 
+import java.util.UUID;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.AMQMessageDelegate;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicConsumeBody;
@@ -65,9 +69,9 @@
         _protocolHandler.writeFrame(declare);
     }
 
-    void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
-                     int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate,
-                     boolean wait) throws JMSException
+    void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+                     UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
+                     boolean immediate, boolean wait) throws JMSException
     {
         BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
                                                                                         destination.getExchangeName(),
@@ -79,7 +83,8 @@
 
         message.prepareForSending();
         ByteBuffer payload = message.getData();
-        BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+        AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
+        BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
 
         if (!_disableTimestamps)
         {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Thu Aug 14 20:40:49 2008
@@ -32,20 +32,20 @@
  * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
  *
  */
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor<C extends BasicMessageConsumer> implements TopicSubscriber
 {
     private final Topic _topic;
-    private final BasicMessageConsumer _consumer;
+    private final C _consumer;
     private final boolean _noLocal;
 
-    TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
+    TopicSubscriberAdaptor(Topic topic, C consumer, boolean noLocal)
     {
         _topic = topic;
         _consumer = consumer;
         _noLocal = noLocal;
     }
     
-    TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
+    TopicSubscriberAdaptor(Topic topic, C consumer)
     {
         this(topic, consumer, consumer.isNoLocal());
     }
@@ -103,7 +103,7 @@
     }
     
     private void checkPreConditions() throws javax.jms.IllegalStateException{
-    	BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+    	C msgConsumer = _consumer;
     	
     	if (msgConsumer.isClosed() ){
 			throw new javax.jms.IllegalStateException("Consumer is closed");
@@ -120,7 +120,7 @@
 		}
 	}
 
-    BasicMessageConsumer getMessageConsumer()
+    C getMessageConsumer()
     {
         return _consumer;
     }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Thu Aug 14 20:40:49 2008
@@ -21,9 +21,9 @@
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.dtx.XidImpl;
-import org.apache.qpidity.transport.*;
+import org.apache.qpid.QpidException;
+import org.apache.qpid.dtx.XidImpl;
+import org.apache.qpid.transport.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,7 +76,7 @@
             _logger.debug("commit tx branch with xid:  ", xid);
         }
         Future<XaResult> future =
-                _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+                _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
 
         // now wait on the future for the result
         XaResult result = null;
@@ -86,7 +86,7 @@
         }
         catch (SessionException e)
         {
-            // we need to restore the qpidity session that has been closed
+            // we need to restore the qpid session that has been closed
             _xaSession.createSession();
             // we should get a single exception
             convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -127,10 +127,11 @@
             default:
                  throw new XAException(XAException.XAER_INVAL);
         }
+        _xaSession.flushAcknowledgments();
         Future<XaResult> future = _xaSession.getQpidSession()
                 .dtxEnd(convertXid(xid),
-                        flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
-                        flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+                        flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
+                        flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
         // now wait on the future for the result
         XaResult result = null;
         try
@@ -139,7 +140,7 @@
         }
         catch (SessionException e)
         {
-            // we need to restore the qpidity session that has been closed
+            // we need to restore the qpid session that has been closed
             _xaSession.createSession();
             // we should get a single exception
             convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -168,7 +169,7 @@
         }
         catch (SessionException e)
         {
-            // we need to restore the qpidity session that has been closed
+            // we need to restore the qpid session that has been closed
             _xaSession.createSession();
             // we should get a single exception
             convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -198,7 +199,7 @@
             }
             catch (SessionException e)
             {
-                // we need to restore the qpidity session that has been closed
+                // we need to restore the qpid session that has been closed
                 _xaSession.createSession();
                 // we should get a single exception
                 convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -245,7 +246,7 @@
         }
         catch (SessionException e)
         {
-            // we need to restore the qpidity session that has been closed
+            // we need to restore the qpid session that has been closed
             _xaSession.createSession();
             // we should get a single exception
             convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -288,7 +289,7 @@
         }
         catch (SessionException e)
         {
-            // we need to restore the qpidity session that has been closed
+            // we need to restore the qpid session that has been closed
             _xaSession.createSession();
             // we should get a single exception
             convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
@@ -297,7 +298,7 @@
         int i = 0;
         for (Object obj : res.getInDoubt())
         {
-            org.apache.qpidity.transport.Xid xid = (org.apache.qpidity.transport.Xid) obj;
+            org.apache.qpid.transport.Xid xid = (org.apache.qpid.transport.Xid) obj;
             result[i] = new XidImpl(xid.getBranchId(), (int) xid.getFormat(), xid.getGlobalId());
             i++;
         }
@@ -326,7 +327,7 @@
         }
         catch (SessionException e)
         {
-            // we need to restore the qpidity session that has been closed
+            // we need to restore the qpid session that has been closed
             _xaSession.createSession();
             // we should get a single exception
             convertExecutionErrorToXAErr( e.getExceptions().get(0).getErrorCode());
@@ -400,8 +401,8 @@
         }
         Future<XaResult> future = _xaSession.getQpidSession()
                 .dtxStart(convertXid(xid),
-                        flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
-                        flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+                        flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
+                        flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
         // now wait on the future for the result
         XaResult result = null;
         try
@@ -410,7 +411,7 @@
         }
         catch (SessionException e)
         {
-            // we need to restore the qpidity session that has been closed
+            // we need to restore the qpid session that has been closed
             _xaSession.createSession();
             // we should get a single exception
             convertExecutionErrorToXAErr(e.getExceptions().get(0).getErrorCode());
@@ -511,7 +512,7 @@
      * @return the qpid formated xid
      * @throws XAException when xid is null or when it cannot be converted. 
      */
-    private org.apache.qpidity.transport.Xid convertXid(Xid xid) throws XAException
+    private org.apache.qpid.transport.Xid convertXid(Xid xid) throws XAException
     {
         if (xid == null)
         {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Thu Aug 14 20:40:49 2008
@@ -17,7 +17,7 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpidity.nclient.DtxSession;
+import org.apache.qpid.nclient.DtxSession;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 
 import javax.jms.*;
@@ -48,7 +48,7 @@
     /**
      * Create a JMS XASession
      */
-    public XASessionImpl(org.apache.qpidity.nclient.Connection qpidConnection, AMQConnection con, int channelId,
+    public XASessionImpl(org.apache.qpid.nclient.Connection qpidConnection, AMQConnection con, int channelId,
                          int defaultPrefetchHigh, int defaultPrefetchLow)
     {
         super(qpidConnection, con, channelId, false,  // this is not a transacted session
@@ -61,7 +61,7 @@
     //-- public methods
 
     /**
-     * Create a qpidity session.
+     * Create a qpid session.
      */
     public void createSession()
     {
@@ -126,7 +126,7 @@
      *
      * @return The associated Qpid Session.
      */
-    protected org.apache.qpidity.nclient.DtxSession getQpidSession()
+    protected org.apache.qpid.nclient.DtxSession getQpidSession()
     {
         return _qpidDtxSession;
     }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Thu Aug 14 20:40:49 2008
@@ -139,11 +139,15 @@
             // have a state waiter waiting until the connection is closed for some reason. Or in future we may have
             // a slightly more complex state model therefore I felt it was worthwhile doing this.
             AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager();
-            _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession()));
+
+            _amqProtocolHandler.setStateManager(new AMQStateManager());
+
+
             if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null))
             {
                 _logger.info("Failover process veto-ed by client");
 
+                //Restore Existing State Manager
                 _amqProtocolHandler.setStateManager(existingStateManager);
 
                 //todo: ritchiem these exceptions are useless... Would be better to attempt to propogate exception that
@@ -181,13 +185,19 @@
 
             if (!failoverSucceeded)
             {
+                //Restore Existing State Manager
                 _amqProtocolHandler.setStateManager(existingStateManager);
+
                 _amqProtocolHandler.getConnection().exceptionReceived(
                         new AMQDisconnectedException("Server closed connection and no failover " +
                                 "was successful", null));
             }
             else
             {
+                // Set the new Protocol Session in the StateManager.               
+                existingStateManager.setProtocolSession(_amqProtocolHandler.getProtocolSession());
+
+                //Restore Existing State Manager
                 _amqProtocolHandler.setStateManager(existingStateManager);
                 try
                 {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/AccessRequestOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -5,14 +5,8 @@
 
 import org.apache.qpid.framing.*;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.AMQNoConsumersException;
-import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.AMQChannelClosedException;
-import org.apache.qpid.protocol.AMQConstant;
 
 public class AccessRequestOkMethodHandler implements StateAwareMethodListener<AccessRequestOkBody>
 {
@@ -25,11 +19,10 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AccessRequestOkBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session, AccessRequestOkBody method, int channelId)
         throws AMQException
     {
         _logger.debug("AccessRequestOk method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         session.setTicket(method.getTicket(), channelId);
 
     }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,11 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,13 +42,9 @@
     private BasicCancelOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, BasicCancelOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, BasicCancelOkBody body, int channelId)
         throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-
-
-
         if (_logger.isInfoEnabled())
         {
             _logger.info("New BasicCancelOk method received for consumer:" + body.getConsumerTag());

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -21,10 +21,8 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicDeliverBody;
 import org.slf4j.Logger;
@@ -41,18 +39,16 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, BasicDeliverBody body, int channelId)
-        throws AMQException
+    public void methodReceived(AMQProtocolSession session, BasicDeliverBody body, int channelId)
+            throws AMQException
     {
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
-                channelId,
                 body.getDeliveryTag(),
-                body.getConsumerTag(),
+                body.getConsumerTag().toIntValue(),
                 body.getExchange(),
                 body.getRoutingKey(),
                 body.getRedelivered());
-        _logger.debug("New JmsDeliver method received");
-        session.unprocessedMessageReceived(msg);
+        _logger.debug("New JmsDeliver method received:" + session);
+        session.unprocessedMessageReceived(channelId, msg);
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,13 +22,9 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.ReturnMessage;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,19 +41,18 @@
     }
 
 
-    public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, BasicReturnBody body, int channelId)
     throws AMQException
     {
         _logger.debug("New JmsBounce method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-        final ReturnMessage msg = new ReturnMessage(channelId,
+        final ReturnMessage msg = new ReturnMessage(
                 body.getExchange(),
                 body.getRoutingKey(),
                 body.getReplyText(),
                 body.getReplyCode()
         );
 
-        session.unprocessedMessageReceived(msg);
+        session.unprocessedMessageReceived(channelId, msg);
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -26,14 +26,12 @@
 import org.apache.qpid.client.AMQNoConsumersException;
 import org.apache.qpid.client.AMQNoRouteException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,12 +47,10 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session, ChannelCloseBody method, int channelId)
         throws AMQException
     {
         _logger.debug("ChannelClose method received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-
 
         AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
         AMQShortString reason = method.getReplyText();

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -23,9 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ChannelCloseOkBody;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,12 +39,11 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager,  ChannelCloseOkBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session,  ChannelCloseOkBody method, int channelId)
         throws AMQException
     {
         _logger.info("Received channel-close-ok for channel-id " + channelId);
 
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         // todo this should do the local closure
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -2,7 +2,6 @@
 
 import org.apache.qpid.framing.ChannelFlowBody;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.AMQException;
 import org.slf4j.Logger;
@@ -42,11 +41,9 @@
     private ChannelFlowMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ChannelFlowBody body, int channelId)
             throws AMQException
     {
-
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         session.setFlowControl(channelId, body.getActive());
     }
 

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +41,7 @@
     private ChannelFlowOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ChannelFlowOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ChannelFlowOkBody body, int channelId)
             throws AMQException
     {