You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2012/03/10 20:22:37 UTC

svn commit: r1299257 [16/26] - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: broker-plugins/ broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/config/ broker-plugins/access-control/src/main/java/org/apache/qpid/serve...

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Sat Mar 10 19:22:10 2012
@@ -21,13 +21,8 @@
 package org.apache.qpid.client;
 
 
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.ArrayList;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
@@ -43,44 +38,20 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.filter.MessageFilter;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicAckBody;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicConsumeOkBody;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.BasicRecoverBody;
-import org.apache.qpid.framing.BasicRecoverOkBody;
-import org.apache.qpid.framing.BasicRecoverSyncBody;
-import org.apache.qpid.framing.BasicRecoverSyncOkBody;
-import org.apache.qpid.framing.BasicRejectBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.QueueBindOkBody;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
-import org.apache.qpid.framing.QueueDeleteBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.framing.TxCommitOkBody;
-import org.apache.qpid.framing.TxRollbackBody;
-import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
 import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
@@ -131,7 +102,7 @@ public class AMQSession_0_8 extends AMQS
     {
         while (true)
         {
-            Long tag = _unacknowledgedMessageTags.poll();
+            Long tag = getUnacknowledgedMessageTags().poll();
             if (tag == null)
             {
                 break;
@@ -145,15 +116,15 @@ public class AMQSession_0_8 extends AMQS
     {
         BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
 
-        final AMQFrame ackFrame = body.generateFrame(_channelId);
+        final AMQFrame ackFrame = body.generateFrame(getChannelId());
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
+            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + getChannelId());
         }
 
         getProtocolHandler().writeFrame(ackFrame, !isTransacted());
-        _unacknowledgedMessageTags.remove(deliveryTag);
+        getUnacknowledgedMessageTags().remove(deliveryTag);
     }
 
     public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
@@ -162,7 +133,7 @@ public class AMQSession_0_8 extends AMQS
     {
         getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
                                         (getTicket(),queueName,exchangeName,routingKey,false,arguments).
-                                        generateFrame(_channelId), QueueBindOkBody.class);
+                                        generateFrame(getChannelId()), QueueBindOkBody.class);
     }
 
     public void sendClose(long timeout) throws AMQException, FailoverException
@@ -179,7 +150,7 @@ public class AMQSession_0_8 extends AMQS
 
             getProtocolHandler().closeSession(this);
             getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
-                                                                                                           new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
+                                                                                                           new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(getChannelId()),
                                            ChannelCloseOkBody.class, timeout);
             // When control resumes at this point, a reply will have been received that
             // indicates the broker has closed the channel successfully.
@@ -191,7 +162,7 @@ public class AMQSession_0_8 extends AMQS
         // Acknowledge all delivered messages
         while (true)
         {
-            Long tag = _deliveredMessageTags.poll();
+            Long tag = getDeliveredMessageTags().poll();
             if (tag == null)
             {
                 break;
@@ -202,7 +173,7 @@ public class AMQSession_0_8 extends AMQS
 
         final AMQProtocolHandler handler = getProtocolHandler();
 
-        handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
+        handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(getChannelId()), TxCommitOkBody.class);
     }
 
     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
@@ -218,22 +189,22 @@ public class AMQSession_0_8 extends AMQS
             }
         }
         QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
-        AMQFrame queueDeclare = body.generateFrame(_channelId);
+        AMQFrame queueDeclare = body.generateFrame(getChannelId());
         getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
 
     public void sendRecover() throws AMQException, FailoverException
     {
         enforceRejectBehaviourDuringRecover();
-        _prefetchedMessageTags.clear();
-        _unacknowledgedMessageTags.clear();
+        getPrefetchedMessageTags().clear();
+        getUnacknowledgedMessageTags().clear();
 
         if (isStrictAMQP())
         {
             // We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
 
             BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
-            _connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
+            getAMQConnection().getProtocolHandler().writeFrame(body.generateFrame(getChannelId()));
             _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
         }
         else
@@ -243,17 +214,17 @@ public class AMQSession_0_8 extends AMQS
             if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
             {
                 BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
-                _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
+                getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverOkBody.class);
             }
             else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
             {
                 BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
-                _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+                getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
             }
             else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
             {
                 BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
-                _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+                getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(getChannelId()), BasicRecoverSyncOkBody.class);
             }
             else
             {
@@ -266,9 +237,9 @@ public class AMQSession_0_8 extends AMQS
     {
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + _unacknowledgedMessageTags);
+            _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + getUnacknowledgedMessageTags());
         }
-        ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(_consumers.values());
+        ArrayList<BasicMessageConsumer_0_8> consumersToCheck = new ArrayList<BasicMessageConsumer_0_8>(getConsumers().values());
         boolean messageListenerFound = false;
         boolean serverRejectBehaviourFound = false;
         for(BasicMessageConsumer_0_8 consumer : consumersToCheck)
@@ -287,7 +258,7 @@ public class AMQSession_0_8 extends AMQS
         if (serverRejectBehaviourFound)
         {
             //reject(false) any messages we don't want returned again
-            switch(_acknowledgeMode)
+            switch(getAcknowledgeMode())
             {
                 case Session.DUPS_OK_ACKNOWLEDGE:
                 case Session.AUTO_ACKNOWLEDGE:
@@ -296,7 +267,7 @@ public class AMQSession_0_8 extends AMQS
                         break;
                     }
                 case Session.CLIENT_ACKNOWLEDGE:
-                    for(Long tag : _unacknowledgedMessageTags)
+                    for(Long tag : getUnacknowledgedMessageTags())
                     {
                         rejectMessage(tag, false);
                     }
@@ -314,7 +285,7 @@ public class AMQSession_0_8 extends AMQS
         // consumer on the queue. Whilst this is within the JMS spec it is not
         // user friendly and avoidable.
         boolean normalRejectBehaviour = true;
-        for (BasicMessageConsumer_0_8 consumer : _consumers.values())
+        for (BasicMessageConsumer_0_8 consumer : getConsumers().values())
         {
             if(RejectBehaviour.SERVER.equals(consumer.getRejectBehaviour()))
             {
@@ -326,7 +297,7 @@ public class AMQSession_0_8 extends AMQS
 
         while (true)
         {
-            Long tag = _deliveredMessageTags.poll();
+            Long tag = getDeliveredMessageTags().poll();
             if (tag == null)
             {
                 break;
@@ -338,8 +309,8 @@ public class AMQSession_0_8 extends AMQS
 
     public void rejectMessage(long deliveryTag, boolean requeue)
     {
-        if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED)||
-                ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
+        if ((getAcknowledgeMode() == CLIENT_ACKNOWLEDGE) || (getAcknowledgeMode() == SESSION_TRANSACTED)||
+                ((getAcknowledgeMode() == AUTO_ACKNOWLEDGE || getAcknowledgeMode() == DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
         {
             if (_logger.isDebugEnabled())
             {
@@ -347,9 +318,9 @@ public class AMQSession_0_8 extends AMQS
             }
 
             BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
-            AMQFrame frame = body.generateFrame(_channelId);
+            AMQFrame frame = body.generateFrame(getChannelId());
 
-            _connection.getProtocolHandler().writeFrame(frame);
+            getAMQConnection().getProtocolHandler().writeFrame(frame);
         }
     }
 
@@ -370,12 +341,12 @@ public class AMQSession_0_8 extends AMQS
                         public AMQMethodEvent execute() throws AMQException, FailoverException
                         {
                             AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
-                                                    (exchangeName, routingKey, queueName).generateFrame(_channelId);
+                                                    (exchangeName, routingKey, queueName).generateFrame(getChannelId());
 
                             return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
 
                         }
-                    }, _connection).execute();
+                    }, getAMQConnection()).execute();
 
             // Extract and return the response code from the query.
             ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
@@ -392,7 +363,6 @@ public class AMQSession_0_8 extends AMQS
                                       AMQShortString queueName,
                                       AMQProtocolHandler protocolHandler,
                                       boolean nowait,
-                                      MessageFilter messageSelector,
                                       int tag) throws AMQException, FailoverException
     {
 
@@ -406,7 +376,7 @@ public class AMQSession_0_8 extends AMQS
                                                                            consumer.getArguments());
 
 
-        AMQFrame jmsConsume = body.generateFrame(_channelId);
+        AMQFrame jmsConsume = body.generateFrame(getChannelId());
 
         if (nowait)
         {
@@ -424,17 +394,25 @@ public class AMQSession_0_8 extends AMQS
         ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
                                                                                  name.toString().startsWith("amq."),
                                                                                  false,false,false,false,null);
-        AMQFrame exchangeDeclare = body.generateFrame(_channelId);
+        AMQFrame exchangeDeclare = body.generateFrame(getChannelId());
 
         protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
     }
 
     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                 final boolean nowait) throws AMQException, FailoverException
+                                 final boolean nowait, boolean passive) throws AMQException, FailoverException
     {
-        QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+        QueueDeclareBody body =
+                getMethodRegistry().createQueueDeclareBody(getTicket(),
+                                                           amqd.getAMQQueueName(),
+                                                           passive,
+                                                           amqd.isDurable(),
+                                                           amqd.isExclusive(),
+                                                           amqd.isAutoDelete(),
+                                                           false,
+                                                           null);
 
-        AMQFrame queueDeclare = body.generateFrame(_channelId);
+        AMQFrame queueDeclare = body.generateFrame(getChannelId());
 
         protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
     }
@@ -446,7 +424,7 @@ public class AMQSession_0_8 extends AMQS
                                                                          false,
                                                                          false,
                                                                          true);
-        AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
+        AMQFrame queueDeleteFrame = body.generateFrame(getChannelId());
 
         getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
     }
@@ -454,8 +432,8 @@ public class AMQSession_0_8 extends AMQS
     public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
     {
         ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
-        AMQFrame channelFlowFrame = body.generateFrame(_channelId);
-        _connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
+        AMQFrame channelFlowFrame = body.generateFrame(getChannelId());
+        getAMQConnection().getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
     }
 
     public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
@@ -464,18 +442,18 @@ public class AMQSession_0_8 extends AMQS
     {
 
         final AMQProtocolHandler protocolHandler = getProtocolHandler();
-       return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
-                                 _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
-                                 exclusive, _acknowledgeMode, noConsume, autoClose);
+       return new BasicMessageConsumer_0_8(getChannelId(), getAMQConnection(), destination, messageSelector, noLocal,
+               getMessageFactoryRegistry(),this, protocolHandler, arguments, prefetchHigh, prefetchLow,
+                                 exclusive, getAcknowledgeMode(), noConsume, autoClose);
     }
 
 
-    public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
-            final boolean immediate, long producerId) throws JMSException
+    public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory,
+            final Boolean immediate, long producerId) throws JMSException
     {
        try
        {
-           return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
+           return new BasicMessageProducer_0_8(getAMQConnection(), (AMQDestination) destination, isTransacted(), getChannelId(),
                                  this, getProtocolHandler(), producerId, immediate, mandatory);
        }
        catch (AMQException e)
@@ -505,7 +483,7 @@ public class AMQSession_0_8 extends AMQS
 
     private void returnBouncedMessage(final ReturnMessage msg)
     {
-        _connection.performConnectionTask(new Runnable()
+        getAMQConnection().performConnectionTask(new Runnable()
         {
             public void run()
             {
@@ -513,8 +491,8 @@ public class AMQSession_0_8 extends AMQS
                 {
                     // Bounced message is processed here, away from the mina thread
                     AbstractJMSMessage bouncedMessage =
-                            _messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
-                                                                  msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(),_queueDestinationCache,_topicDestinationCache);
+                            getMessageFactoryRegistry().createMessage(0, false, msg.getExchange(),
+                                    msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), _queueDestinationCache, _topicDestinationCache);
                     AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
                     AMQShortString reason = msg.getReplyText();
                     _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
@@ -522,20 +500,17 @@ public class AMQSession_0_8 extends AMQS
                     // @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
                     if (errorCode == AMQConstant.NO_CONSUMERS)
                     {
-                        _connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
-                    }
-                    else if (errorCode == AMQConstant.NO_ROUTE)
+                        getAMQConnection().exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
+                    } else if (errorCode == AMQConstant.NO_ROUTE)
                     {
-                        _connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
-                    }
-                    else
+                        getAMQConnection().exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
+                    } else
                     {
-                        _connection.exceptionReceived(
+                        getAMQConnection().exceptionReceived(
                                 new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
                     }
 
-                }
-                catch (Exception e)
+                } catch (Exception e)
                 {
                     _logger.error(
                             "Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
@@ -571,7 +546,7 @@ public class AMQSession_0_8 extends AMQS
                   
                         return null;
                     }
-                 }, _connection).execute();
+                 }, getAMQConnection()).execute();
     }
 
     public DestinationCache<AMQQueue> getQueueDestinationCache()
@@ -607,9 +582,18 @@ public class AMQSession_0_8 extends AMQS
             return matches;
         }
 
+        public long getMessageCount()
+        {
+            return _messageCount;
+        }
+
+        public long getConsumerCount()
+        {
+            return _consumerCount;
+        }
     }
 
-    protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
+    protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException
     {
         AMQFrame queueDeclare =
             getMethodRegistry().createQueueDeclareBody(getTicket(),
@@ -619,10 +603,10 @@ public class AMQSession_0_8 extends AMQS
                                                        amqd.isExclusive(),
                                                        amqd.isAutoDelete(),
                                                        false,
-                                                       null).generateFrame(_channelId);
+                                                       null).generateFrame(getChannelId());
         QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
         getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);        
-        return okHandler._messageCount;
+        return okHandler.getMessageCount();
     }
 
     protected boolean tagLE(long tag1, long tag2)
@@ -647,6 +631,7 @@ public class AMQSession_0_8 extends AMQS
 
     public void handleAddressBasedDestination(AMQDestination dest, 
                                               boolean isConsumer,
+                                              boolean noLocal,
                                               boolean noWait) throws AMQException
     {
         throw new UnsupportedOperationException("The new addressing based sytanx is "
@@ -683,7 +668,7 @@ public class AMQSession_0_8 extends AMQS
     {
         // if the Connection has closed then we should throw any exception that
         // has occurred that we were not waiting for
-        AMQStateManager manager = _connection.getProtocolHandler()
+        AMQStateManager manager = getAMQConnection().getProtocolHandler()
                 .getStateManager();
         
         Exception e = manager.getLastException();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,11 @@
  */
 package org.apache.qpid.client;
 
-import java.util.UUID;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 import javax.jms.TemporaryQueue;
-
-import org.apache.qpid.framing.AMQShortString;
+import java.util.UUID;
 
 /** AMQ implementation of a TemporaryQueue. */
 final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Sat Mar 10 19:22:10 2012
@@ -20,17 +20,16 @@
  */
 package org.apache.qpid.client;
 
-import java.net.URISyntaxException;
-
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Topic;
-
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.messaging.Address;
 import org.apache.qpid.url.BindingURL;
 
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import java.net.URISyntaxException;
+
 public class AMQTopic extends AMQDestination implements Topic
 {
     public AMQTopic(String address) throws URISyntaxException
@@ -175,7 +174,7 @@ public class AMQTopic extends AMQDestina
         }
         else
         {
-            return _exchangeName;
+            return super.getExchangeName();
         }
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSessionAdaptor.java Sat Mar 10 19:22:10 2012
@@ -20,182 +20,30 @@
  */
 package org.apache.qpid.client;
 
-import java.io.Serializable;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
+import javax.jms.*;
 import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
 
-public class AMQTopicSessionAdaptor implements TopicSession, AMQSessionAdapter
+class AMQTopicSessionAdaptor extends AMQSessionAdapter<TopicSession> implements TopicSession
 {
-    protected final AMQSession _session;
-
-    public AMQTopicSessionAdaptor(Session session)
-    {
-        _session = (AMQSession) session;
-    }
 
-    public Topic createTopic(String string) throws JMSException
+    public AMQTopicSessionAdaptor(TopicSession session)
     {
-        return _session.createTopic(string);
+        super(session);
     }
 
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
-        return _session.createSubscriber(topic);
+        return getSession().createSubscriber(topic);
     }
 
     public TopicSubscriber createSubscriber(Topic topic, String string, boolean b) throws JMSException
     {
-        return _session.createSubscriber(topic, string, b);
-    }
-
-    public TopicSubscriber createDurableSubscriber(Topic topic, String string) throws JMSException
-    {
-        return _session.createDurableSubscriber(topic, string);
-    }
-
-    public TopicSubscriber createDurableSubscriber(Topic topic, String string, String string1, boolean b) throws JMSException
-    {
-        return _session.createDurableSubscriber(topic, string, string1, b);
+        return getSession().createSubscriber(topic, string, b);
     }
 
     public TopicPublisher createPublisher(Topic topic) throws JMSException
     {
-        return _session.createPublisher(topic);
-    }
-
-    public TemporaryTopic createTemporaryTopic() throws JMSException
-    {
-        return _session.createTemporaryTopic();
-    }
-
-    public void unsubscribe(String string) throws JMSException
-    {
-        _session.unsubscribe(string);
-    }
-
-    public BytesMessage createBytesMessage() throws JMSException
-    {
-        return _session.createBytesMessage();
-    }
-
-    public MapMessage createMapMessage() throws JMSException
-    {
-        return _session.createMapMessage();
-    }
-
-    public Message createMessage() throws JMSException
-    {
-        return _session.createMessage();
-    }
-
-    public ObjectMessage createObjectMessage() throws JMSException
-    {
-        return _session.createObjectMessage();
-    }
-
-    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException
-    {
-        return _session.createObjectMessage(serializable);
-    }
-
-    public StreamMessage createStreamMessage() throws JMSException
-    {
-        return _session.createStreamMessage();
-    }
-
-    public TextMessage createTextMessage() throws JMSException
-    {
-        return _session.createTextMessage();
-    }
-
-    public TextMessage createTextMessage(String string) throws JMSException
-    {
-        return _session.createTextMessage(string);
-    }
-
-    public boolean getTransacted() throws JMSException
-    {
-        return _session.getTransacted();
-    }
-
-    public int getAcknowledgeMode() throws JMSException
-    {
-        return _session.getAcknowledgeMode();
-    }
-
-    public void commit() throws JMSException
-    {
-        _session.commit();
-    }
-
-    public void rollback() throws JMSException
-    {
-        _session.rollback();
-    }
-
-    public void close() throws JMSException
-    {
-        _session.close();
-    }
-
-    public void recover() throws JMSException
-    {
-        _session.recover();
-    }
-
-    public MessageListener getMessageListener() throws JMSException
-    {
-        return _session.getMessageListener();
-    }
-
-    public void setMessageListener(MessageListener messageListener) throws JMSException
-    {
-        _session.setMessageListener(messageListener);
-    }
-
-    public void run()
-    {
-        _session.run();
-    }
-
-    public MessageProducer createProducer(Destination destination) throws JMSException
-    {
-        return _session.createProducer(destination);
-    }
-
-    public MessageConsumer createConsumer(Destination destination) throws JMSException
-    {
-        return _session.createConsumer(destination);
-    }
-
-    public MessageConsumer createConsumer(Destination destination, String string) throws JMSException
-    {
-        return _session.createConsumer(destination, string);
-    }
-
-    public MessageConsumer createConsumer(Destination destination, String string, boolean b) throws JMSException
-    {
-        return _session.createConsumer(destination, string, b);
+        return getSession().createPublisher(topic);
     }
 
     //The following methods cannot be called from a TopicSession as per JMS spec
@@ -219,8 +67,4 @@ public class AMQTopicSessionAdaptor impl
         throw new IllegalStateException("Cannot call createTemporaryQueue from TopicSession");
     }
 
-    public AMQSession getSession()
-    {
-        return _session;
-    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Sat Mar 10 19:22:10 2012
@@ -20,29 +20,35 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.AMQInternalException;
-import org.apache.qpid.filter.JMSSelectorFilter;
-import org.apache.qpid.filter.MessageFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.filter.MessageFilter;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.CloseConsumerMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.transport.TransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -54,14 +60,13 @@ public abstract class BasicMessageConsum
 {
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
-    /** The connection being used by this consumer */
-    protected final AMQConnection _connection;
+    private final AMQConnection _connection;
 
-    protected final MessageFilter _messageSelectorFilter;
+    private final MessageFilter _messageSelectorFilter;
 
     private final boolean _noLocal;
 
-    protected AMQDestination _destination;
+    private AMQDestination _destination;
 
     /**
      * When true indicates that a blocking receive call is in progress
@@ -72,23 +77,17 @@ public abstract class BasicMessageConsum
      */
     private final AtomicReference<MessageListener> _messageListener = new AtomicReference<MessageListener>();
 
-    /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
-    protected int _consumerTag;
+    private int _consumerTag;
 
-    /** We need to know the channel id when constructing frames */
-    protected final int _channelId;
+    private final int _channelId;
 
-    /**
-     * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
-     * <p/> Argument true indicates we want strict FIFO semantics
-     */
-    protected final BlockingQueue _synchronousQueue;
+    private final BlockingQueue _synchronousQueue;
 
-    protected final MessageFactoryRegistry _messageFactory;
+    private final MessageFactoryRegistry _messageFactory;
 
-    protected final AMQSession _session;
+    private final AMQSession _session;
 
-    protected final AMQProtocolHandler _protocolHandler;
+    private final AMQProtocolHandler _protocolHandler;
 
     /**
      * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
@@ -107,17 +106,9 @@ public abstract class BasicMessageConsum
      */
     private final int _prefetchLow;
 
-    /**
-     * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
-     */
-    protected boolean _exclusive;
+    private boolean _exclusive;
 
-    /**
-     * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
-     * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
-     * implementation.
-     */
-    protected final int _acknowledgeMode;
+    private final int _acknowledgeMode;
 
     /**
      * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
@@ -208,6 +199,10 @@ public abstract class BasicMessageConsum
         // possible to determine  when querying the broker whether there are no arguments or just a non-matching selector
         // argument, as specifying null for the arguments when querying means they should not be checked at all
         ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+        if(noLocal)
+        {
+            ft.put(AMQPFilterTypes.NO_LOCAL.getValue(), noLocal);
+        }
 
         _arguments = ft;
 
@@ -232,6 +227,11 @@ public abstract class BasicMessageConsum
         return _messageListener.get();
     }
 
+    /**
+     * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
+     * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
+     * implementation.
+     */
     public int getAcknowledgeMode()
     {
         return _acknowledgeMode;
@@ -279,7 +279,10 @@ public abstract class BasicMessageConsum
                 throw new javax.jms.IllegalStateException("Attempt to alter listener while session is started.");
             }
 
-            _logger.debug("Message listener set for destination " + _destination);
+            if (_logger.isDebugEnabled())
+            {
+            	_logger.debug("Message listener set for destination " + _destination);
+            }
 
             if (messageListener != null)
             {
@@ -371,6 +374,9 @@ public abstract class BasicMessageConsum
         return _noLocal;
     }
 
+    /**
+     * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
+     */
     public boolean isExclusive()
     {
         return _exclusive;
@@ -537,7 +543,7 @@ public abstract class BasicMessageConsum
         }
         else if (o instanceof CloseConsumerMessage)
         {
-            _closed.set(true);
+            setClosed();
             deregisterConsumer();
             return null;
         }
@@ -554,14 +560,14 @@ public abstract class BasicMessageConsum
 
     public void close(boolean sendClose) throws JMSException
     {
-        if (_logger.isInfoEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.info("Closing consumer:" + debugIdentity());
+            _logger.debug("Closing consumer:" + debugIdentity());
         }
 
-        if (!_closed.getAndSet(true))
+        if (!setClosed())
         {
-            _closing.set(true);
+            setClosing(true);
             if (_logger.isDebugEnabled())
             {
                 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -607,12 +613,8 @@ public abstract class BasicMessageConsum
             }
             else
             {
-            	// FIXME: wow this is ugly
-                // //fixme this probably is not right
-                // if (!isNoConsume())
-                { // done in BasicCancelOK Handler but not sending one so just deregister.
-                    deregisterConsumer();
-                }
+            	// FIXME?
+                deregisterConsumer();
             }
 
             // This will occur if session.close is called closing all consumers we may be blocked waiting for a receive
@@ -641,7 +643,7 @@ public abstract class BasicMessageConsum
     {
         // synchronized (_closed)
         {
-            _closed.set(true);
+            setClosed();
 
             if (_logger.isDebugEnabled())
             {
@@ -818,7 +820,7 @@ public abstract class BasicMessageConsum
     {
         // synchronized (_closed)
         {
-            _closed.set(true);
+            setClosed();
             if (_logger.isDebugEnabled())
             {
                 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
@@ -859,6 +861,7 @@ public abstract class BasicMessageConsum
         _session.deregisterConsumer(this);
     }
 
+    /** The consumer tag allows us to close the consumer by sending a jmsCancel method to the broker */
     public int getConsumerTag()
     {
         return _consumerTag;
@@ -1002,10 +1005,44 @@ public abstract class BasicMessageConsum
     public void failedOverPre()
     {
         clearReceiveQueue();
-        // TGM FIXME: think this should just be removed
-        // clearUnackedMessages();
     }
 
     public void failedOverPost() {}
 
+    /** The connection being used by this consumer */
+    protected AMQConnection getConnection()
+    {
+        return _connection;
+    }
+
+    protected void setDestination(AMQDestination destination)
+    {
+        _destination = destination;
+    }
+
+    /** We need to know the channel id when constructing frames */
+    protected int getChannelId()
+    {
+        return _channelId;
+    }
+
+    /**
+     * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
+     * <p/> Argument true indicates we want strict FIFO semantics
+     */
+    protected BlockingQueue getSynchronousQueue()
+    {
+        return _synchronousQueue;
+    }
+
+    protected MessageFactoryRegistry getMessageFactory()
+    {
+        return _messageFactory;
+    }
+
+    protected AMQProtocolHandler getProtocolHandler()
+    {
+        return _protocolHandler;
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Sat Mar 10 19:22:10 2012
@@ -19,21 +19,32 @@ package org.apache.qpid.client;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AMQMessageDelegate_0_10;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage_0_10;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.*;
 import org.apache.qpid.jms.Session;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.transport.Acquired;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.Range;
+import org.apache.qpid.transport.RangeSet;
+import org.apache.qpid.transport.RangeSetFactory;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.TransportException;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -46,7 +57,7 @@ public class BasicMessageConsumer_0_10 e
     /**
      * This class logger
      */
-    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+    private final Logger _logger = LoggerFactory.getLogger(getClass());
 
     /**
      * The underlying QpidSession
@@ -67,7 +78,7 @@ public class BasicMessageConsumer_0_10 e
     private final long _capacity;
 
     /** Flag indicating if the server supports message selectors */
-    protected final boolean _serverJmsSelectorSupport;
+    private final boolean _serverJmsSelectorSupport;
 
     protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
                                         String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
@@ -80,11 +91,10 @@ public class BasicMessageConsumer_0_10 e
                 rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
         _0_10session = (AMQSession_0_10) session;
 
-        _preAcquire = evaluatePreAcquire(browseOnly, destination);
-
-        _capacity = evaluateCapacity(destination);
         _serverJmsSelectorSupport = connection.isSupportedServerFeature(ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR);
+        _preAcquire = evaluatePreAcquire(browseOnly, destination, _serverJmsSelectorSupport);
 
+        _capacity = evaluateCapacity(destination);
 
         if (destination.isAddressResolved() && AMQDestination.TOPIC_TYPE == destination.getAddressType()) 
         {            
@@ -92,8 +102,8 @@ public class BasicMessageConsumer_0_10 e
             
             if (!namedQueue)
             {
-                _destination = destination.copyDestination();
-                _destination.setQueueName(null);
+                setDestination(destination.copyDestination());
+                getDestination().setQueueName(null);
             }
         }
     }
@@ -181,14 +191,14 @@ public class BasicMessageConsumer_0_10 e
     {
         super.preDeliver(jmsMsg);
 
-        if (_acknowledgeMode == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
+        if (getAcknowledgeMode() == org.apache.qpid.jms.Session.NO_ACKNOWLEDGE)
         {
             //For 0-10 we need to ensure that all messages are indicated processed in some way to
             //ensure their AMQP command-id is marked completed, and so we must send a completion
             //even for no-ack messages even though there isnt actually an 'acknowledgement' occurring.
             //Add message to the unacked message list to ensure we dont lose record of it before
             //sending a completion of some sort.
-            _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
+            getSession().addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
         }
     }
 
@@ -196,7 +206,7 @@ public class BasicMessageConsumer_0_10 e
             AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_10 msg) throws Exception
     {
         AMQMessageDelegate_0_10.updateExchangeTypeMapping(msg.getMessageTransfer().getHeader(), ((AMQSession_0_10)getSession()).getQpidSession());
-        return _messageFactory.createMessage(msg.getMessageTransfer());
+        return getMessageFactory().createMessage(msg.getMessageTransfer());
     }
 
     /**
@@ -211,9 +221,9 @@ public class BasicMessageConsumer_0_10 e
         boolean messageOk = true;
         try
         {
-            if (_messageSelectorFilter != null && !_serverJmsSelectorSupport)
+            if (!_serverJmsSelectorSupport && getMessageSelectorFilter() != null)
             {
-                messageOk = _messageSelectorFilter.matches(message);
+                messageOk = getMessageSelectorFilter().matches(message);
             }
         }
         catch (Exception e)
@@ -274,7 +284,7 @@ public class BasicMessageConsumer_0_10 e
     {
         _0_10session.messageAcknowledge
             (Range.newInstance((int) message.getDeliveryTag()),
-             _acknowledgeMode != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
+             getAcknowledgeMode() != org.apache.qpid.jms.Session.NO_ACKNOWLEDGE);
 
         final AMQException amqe = _0_10session.getCurrentException();
         if (amqe != null)
@@ -338,20 +348,20 @@ public class BasicMessageConsumer_0_10 e
             {
                 messageFlow();
             }
-            if (messageListener != null && !_synchronousQueue.isEmpty())
+            if (messageListener != null && !getSynchronousQueue().isEmpty())
             {
-                Iterator messages=_synchronousQueue.iterator();
+                Iterator messages= getSynchronousQueue().iterator();
                 while (messages.hasNext())
                 {
                     AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
                     messages.remove();
-                    _session.rejectMessage(message, true);
+                    getSession().rejectMessage(message, true);
                 }
             }
         }
         catch(TransportException e)
         {
-            throw _session.toJMSException("Exception while setting message listener:"+ e.getMessage(), e);
+            throw getSession().toJMSException("Exception while setting message listener:" + e.getMessage(), e);
         }
     }
 
@@ -378,7 +388,7 @@ public class BasicMessageConsumer_0_10 e
         {
             _syncReceive.set(true);
         }
-        if (_0_10session.isStarted() && _capacity == 0 && _synchronousQueue.isEmpty())
+        if (_0_10session.isStarted() && _capacity == 0 && getSynchronousQueue().isEmpty())
         {
             messageFlow();
         }
@@ -415,19 +425,19 @@ public class BasicMessageConsumer_0_10 e
     {
         super.postDeliver(msg);
 
-        switch (_acknowledgeMode)
+        switch (getAcknowledgeMode())
         {
             case Session.SESSION_TRANSACTED:
                 _0_10session.sendTxCompletionsIfNecessary();
                 break;
             case Session.NO_ACKNOWLEDGE:
-                if (!_session.isInRecovery())
+                if (!getSession().isInRecovery())
                 {
-                  _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                  getSession().acknowledgeMessage(msg.getDeliveryTag(), false);
                 }
                 break;
             case Session.AUTO_ACKNOWLEDGE:
-                if (!_session.isInRecovery() && _session.getAMQConnection().getSyncAck())
+                if (!getSession().isInRecovery() && getSession().getAMQConnection().getSyncAck())
                 {
                     ((AMQSession_0_10) getSession()).getQpidSession().sync();
                 }
@@ -443,10 +453,10 @@ public class BasicMessageConsumer_0_10 e
 
     @Override public void rollbackPendingMessages()
     {
-        if (_synchronousQueue.size() > 0)
+        if (getSynchronousQueue().size() > 0)
         {
             RangeSet ranges = RangeSetFactory.createRangeSet();
-            Iterator iterator = _synchronousQueue.iterator();
+            Iterator iterator = getSynchronousQueue().iterator();
             while (iterator.hasNext())
             {
 
@@ -486,7 +496,7 @@ public class BasicMessageConsumer_0_10 e
         }
         else
         {
-            return _exclusive;
+            return super.isExclusive();
         }
     }
     
@@ -514,7 +524,7 @@ public class BasicMessageConsumer_0_10 e
         return _preAcquire;
     }
 
-    private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination)
+    private boolean evaluatePreAcquire(boolean browseOnly, AMQDestination destination, boolean serverJmsSelectorSupport)
     {
         boolean preAcquire;
         if (browseOnly)
@@ -524,7 +534,7 @@ public class BasicMessageConsumer_0_10 e
         else
         {
             boolean isQueue = (destination instanceof AMQQueue || getDestination().getAddressType() == AMQDestination.QUEUE_TYPE);
-            if (isQueue && getMessageSelectorFilter() != null)
+            if (!serverJmsSelectorSupport && isQueue && getMessageSelectorFilter() != null)
             {
                 preAcquire = false;
             }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java Sat Mar 10 19:22:10 2012
@@ -20,24 +20,31 @@
  */
 package org.apache.qpid.client;
 
-import javax.jms.JMSException;
-import javax.jms.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.message.*;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.common.AMQPFilterTypes;
 import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.BindingURL;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
 
 public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMessage_0_8>
 {
-    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+    private final Logger _logger = LoggerFactory.getLogger(getClass());
     private AMQSession_0_8.DestinationCache<AMQTopic> _topicDestinationCache;
     private AMQSession_0_8.DestinationCache<AMQQueue> _queueDestinationCache;
 
@@ -88,11 +95,11 @@ public class BasicMessageConsumer_0_8 ex
 
     void sendCancel() throws AMQException, FailoverException
     {
-        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(_consumerTag)), false);
+        BasicCancelBody body = getSession().getMethodRegistry().createBasicCancelBody(new AMQShortString(String.valueOf(getConsumerTag())), false);
 
-        final AMQFrame cancelFrame = body.generateFrame(_channelId);
+        final AMQFrame cancelFrame = body.generateFrame(getChannelId());
 
-        _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+        getProtocolHandler().syncWrite(cancelFrame, BasicCancelOkBody.class);
 
         if (_logger.isDebugEnabled())
         {
@@ -103,9 +110,9 @@ public class BasicMessageConsumer_0_8 ex
     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(AMQMessageDelegateFactory delegateFactory, UnprocessedMessage_0_8 messageFrame)throws Exception
     {
 
-        return _messageFactory.createMessage(messageFrame.getDeliveryTag(),
-                                             messageFrame.isRedelivered(), messageFrame.getExchange(),
-                                             messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
+        return getMessageFactory().createMessage(messageFrame.getDeliveryTag(),
+                messageFrame.isRedelivered(), messageFrame.getExchange(),
+                messageFrame.getRoutingKey(), messageFrame.getContentHeader(), messageFrame.getBodies(),
                 _queueDestinationCache, _topicDestinationCache);
 
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sat Mar 10 19:22:10 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
 
 import java.io.UnsupportedEncodingException;
 import java.util.UUID;
-
 import javax.jms.BytesMessage;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
@@ -33,12 +32,11 @@ import javax.jms.Message;
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
-
+import javax.jms.Topic;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageConverter;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.UUIDGen;
 import org.apache.qpid.util.UUIDs;
@@ -49,14 +47,11 @@ public abstract class BasicMessageProduc
 {
     enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
 
-    protected final Logger _logger = LoggerFactory.getLogger(getClass());
+    private final Logger _logger ;
 
     private AMQConnection _connection;
 
-    /**
-     * If true, messages will not get a timestamp.
-     */
-    protected boolean _disableTimestamps;
+    private boolean _disableTimestamps;
 
     /**
      * Priority of messages created by this producer.
@@ -73,10 +68,7 @@ public abstract class BasicMessageProduc
      */
     private int _deliveryMode = DeliveryMode.PERSISTENT;
 
-    /**
-     * The Destination used for this consumer, if specified upon creation.
-     */
-    protected AMQDestination _destination;
+    private AMQDestination _destination;
 
     /**
      * Default encoding used for messages produced by this producer.
@@ -88,14 +80,14 @@ public abstract class BasicMessageProduc
      */
     private String _mimeType;
 
-    protected AMQProtocolHandler _protocolHandler;
+    private AMQProtocolHandler _protocolHandler;
 
     /**
      * True if this producer was created from a transacted session
      */
     private boolean _transacted;
 
-    protected int _channelId;
+    private int _channelId;
 
     /**
      * This is an id generated by the session and is used to tie individual producers to the session. This means we
@@ -105,29 +97,49 @@ public abstract class BasicMessageProduc
      */
     private long _producerId;
 
-    /**
-     * The session used to create this producer
-     */
-    protected AMQSession _session;
+    private AMQSession _session;
 
     private final boolean _immediate;
 
-    private final boolean _mandatory;
+    private final Boolean _mandatory;
 
     private boolean _disableMessageId;
 
     private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
 
-    protected String _userID;  // ref user id used in the connection.
+    private String _userID;  // ref user id used in the connection.
 
-    private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
-    protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
+    /**
+     * The default value for immediate flag used this producer is false. That is, a consumer does
+     * not need to be attached to a queue.
+     */
+    private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+
+    /**
+     * The default value for mandatory flag used by this producer is true. That is, server will not
+     * silently drop messages where no queue is connected to the exchange for the message.
+     */
+    private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
 
-    protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+    /**
+     * The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server
+     * will silently drop messages where no queue is connected to the exchange for the message.
+     */
+    private final boolean _defaultMandatoryTopicValue =
+            Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic",
+                    System.getProperties().containsKey("qpid.default_mandatory")
+                            ? System.getProperty("qpid.default_mandatory")
+                            : "false"));
+
+    private PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
+
+    protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
+                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+                                   Boolean immediate, Boolean mandatory) throws AMQException
     {
-        _connection = connection;
+    	_logger = logger;
+    	_connection = connection;
         _destination = destination;
         _transacted = transacted;
         _protocolHandler = protocolHandler;
@@ -139,8 +151,14 @@ public abstract class BasicMessageProduc
             declareDestination(destination);
         }
 
-        _immediate = immediate;
-        _mandatory = mandatory;
+        _immediate = immediate == null ? _defaultImmediateValue : immediate;
+        _mandatory = mandatory == null
+                ? destination == null ? null
+                                      : destination instanceof Topic
+                                            ? _defaultMandatoryTopicValue
+                                            : _defaultMandatoryValue
+                : mandatory;
+
         _userID = connection.getUsername();
         setPublishMode();
     }
@@ -161,7 +179,10 @@ public abstract class BasicMessageProduc
             publishMode = PublishMode.SYNC_PUBLISH_ALL;
         }
 
-        _logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
+        if (_logger.isDebugEnabled())
+        {
+        	_logger.debug("MessageProducer " + toString() + " using publish mode : " + publishMode);
+        }
     }
 
     void resubscribe() throws AMQException
@@ -256,6 +277,14 @@ public abstract class BasicMessageProduc
         return _timeToLive;
     }
 
+    protected AMQDestination getAMQDestination()
+    {
+        return _destination;
+    }
+
+    /**
+     * The Destination used for this consumer, if specified upon creation.
+     */
     public Destination getDestination() throws JMSException
     {
         checkNotClosed();
@@ -265,7 +294,7 @@ public abstract class BasicMessageProduc
 
     public void close() throws JMSException
     {
-        _closed.set(true);
+        setClosed();
         _session.deregisterProducer(_producerId);
     }
 
@@ -319,7 +348,12 @@ public abstract class BasicMessageProduc
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
+            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
+                    _mandatory == null
+                            ? destination instanceof Topic
+                                ? _defaultMandatoryTopicValue
+                                : _defaultMandatoryValue
+                            : _mandatory,
                      _immediate);
         }
     }
@@ -332,7 +366,13 @@ public abstract class BasicMessageProduc
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
+                    _mandatory == null
+                            ? destination instanceof Topic
+                                ? _defaultMandatoryTopicValue
+                                : _defaultMandatoryValue
+                            : _mandatory,
+                    _immediate);
         }
     }
 
@@ -480,7 +520,10 @@ public abstract class BasicMessageProduc
             _logger.debug("Updating original message");
             origMessage.setJMSPriority(message.getJMSPriority());
             origMessage.setJMSTimestamp(message.getJMSTimestamp());
-            _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+            if (_logger.isDebugEnabled())
+            {
+            	_logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+            }
             origMessage.setJMSExpiration(message.getJMSExpiration());
             origMessage.setJMSMessageID(message.getJMSMessageID());
         }
@@ -564,6 +607,9 @@ public abstract class BasicMessageProduc
 
     }
 
+    /**
+     * The session used to create this producer
+     */
     public AMQSession getSession()
     {
         return _session;
@@ -580,4 +626,73 @@ public abstract class BasicMessageProduc
             throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
         }
     }
+
+    /**
+     * If true, messages will not get a timestamp.
+     */
+    protected boolean isDisableTimestamps()
+    {
+        return _disableTimestamps;
+    }
+
+    protected void setDisableTimestamps(boolean disableTimestamps)
+    {
+        _disableTimestamps = disableTimestamps;
+    }
+
+    protected void setDestination(AMQDestination destination)
+    {
+        _destination = destination;
+    }
+
+    protected AMQProtocolHandler getProtocolHandler()
+    {
+        return _protocolHandler;
+    }
+
+    protected void setProtocolHandler(AMQProtocolHandler protocolHandler)
+    {
+        _protocolHandler = protocolHandler;
+    }
+
+    protected int getChannelId()
+    {
+        return _channelId;
+    }
+
+    protected void setChannelId(int channelId)
+    {
+        _channelId = channelId;
+    }
+
+    protected void setSession(AMQSession session)
+    {
+        _session = session;
+    }
+
+    protected String getUserID()
+    {
+        return _userID;
+    }
+
+    protected void setUserID(String userID)
+    {
+        _userID = userID;
+    }
+
+    protected PublishMode getPublishMode()
+    {
+        return publishMode;
+    }
+
+    protected void setPublishMode(PublishMode publishMode)
+    {
+        this.publishMode = publishMode;
+    }
+
+    Logger getLogger()
+    {
+        return _logger;
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Sat Mar 10 19:22:10 2012
@@ -17,18 +17,8 @@
  */
 package org.apache.qpid.client;
 
-import static org.apache.qpid.transport.Option.NONE;
-import static org.apache.qpid.transport.Option.SYNC;
-import static org.apache.qpid.transport.Option.UNRELIABLE;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.AddressOption;
@@ -48,8 +38,18 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.Option;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.util.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static org.apache.qpid.transport.Option.NONE;
+import static org.apache.qpid.transport.Option.SYNC;
+import static org.apache.qpid.transport.Option.UNRELIABLE;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
 /**
  * This is a 0_10 message producer.
@@ -61,11 +61,11 @@ public class BasicMessageProducer_0_10 e
 
     BasicMessageProducer_0_10(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
                               AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
-                              boolean immediate, boolean mandatory) throws AMQException
+                              Boolean immediate, Boolean mandatory) throws AMQException
     {
-        super(connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
+        super(_logger, connection, destination, transacted, channelId, session, protocolHandler, producerId, immediate, mandatory);
         
-        userIDBytes = Strings.toUTF8(_userID);
+        userIDBytes = Strings.toUTF8(getUserID());
     }
 
     void declareDestination(AMQDestination destination) throws AMQException
@@ -86,7 +86,7 @@ public class BasicMessageProducer_0_10 e
         {       
             try
             {
-                getSession().handleAddressBasedDestination(destination,false,false);
+                getSession().handleAddressBasedDestination(destination,false,false,false);
             }
             catch(Exception e)
             {
@@ -125,7 +125,7 @@ public class BasicMessageProducer_0_10 e
         }
 
         long currentTime = 0;
-        if (timeToLive > 0 || !_disableTimestamps)
+        if (timeToLive > 0 || !isDisableTimestamps())
         {
             currentTime = System.currentTimeMillis();
         }        
@@ -136,7 +136,7 @@ public class BasicMessageProducer_0_10 e
             message.setJMSExpiration(currentTime + timeToLive);
         }
         
-        if (!_disableTimestamps)
+        if (!isDisableTimestamps())
         {
             
             deliveryProp.setTimestamp(currentTime);            
@@ -213,8 +213,8 @@ public class BasicMessageProducer_0_10 e
             // if true, we need to sync the delivery of this message
             boolean sync = false;
 
-            sync = ( (publishMode == PublishMode.SYNC_PUBLISH_ALL) ||
-                     (publishMode == PublishMode.SYNC_PUBLISH_PERSISTENT && 
+            sync = ( (getPublishMode() == PublishMode.SYNC_PUBLISH_ALL) ||
+                     (getPublishMode() == PublishMode.SYNC_PUBLISH_PERSISTENT &&
                          deliveryMode == DeliveryMode.PERSISTENT)
                    );  
             
@@ -248,14 +248,14 @@ public class BasicMessageProducer_0_10 e
     @Override
     public boolean isBound(AMQDestination destination) throws JMSException
     {
-        return _session.isQueueBound(destination);
+        return getSession().isQueueBound(destination);
     }
     
     @Override
     public void close() throws JMSException
     {
         super.close();
-        AMQDestination dest = _destination;
+        AMQDestination dest = getAMQDestination();
         if (dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
         {
             if (dest.getDelete() == AddressOption.ALWAYS ||
@@ -264,7 +264,7 @@ public class BasicMessageProducer_0_10 e
                 try
                 {
                     ((AMQSession_0_10) getSession()).getQpidSession().queueDelete(
-                        _destination.getQueueName());
+                        getAMQDestination().getQueueName());
                 }
                 catch(TransportException e)
                 {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Sat Mar 10 19:22:10 2012
@@ -20,18 +20,9 @@
  */
 package org.apache.qpid.client;
 
-import java.util.UUID;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Topic;
-import javax.jms.Queue;
-
-import java.nio.ByteBuffer;
-
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
+import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -42,13 +33,24 @@ import org.apache.qpid.framing.ContentHe
 import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.MethodRegistry;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
 public class BasicMessageProducer_0_8 extends BasicMessageProducer
 {
+	private static final Logger _logger = LoggerFactory.getLogger(BasicMessageProducer_0_8.class);
 
     BasicMessageProducer_0_8(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-            AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
+            AMQSession session, AMQProtocolHandler protocolHandler, long producerId, Boolean immediate, Boolean mandatory) throws AMQException
     {
-        super(connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
+        super(_logger,connection, destination,transacted,channelId,session, protocolHandler, producerId, immediate, mandatory);
     }
 
     void declareDestination(AMQDestination destination)
@@ -56,7 +58,7 @@ public class BasicMessageProducer_0_8 ex
 
         final MethodRegistry methodRegistry = getSession().getMethodRegistry();
         ExchangeDeclareBody body =
-                methodRegistry.createExchangeDeclareBody(_session.getTicket(),
+                methodRegistry.createExchangeDeclareBody(getSession().getTicket(),
                                                          destination.getExchangeName(),
                                                          destination.getExchangeClass(),
                                                          destination.getExchangeName().toString().startsWith("amq."),
@@ -68,29 +70,29 @@ public class BasicMessageProducer_0_8 ex
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since passive is set to false
 
-        AMQFrame declare = body.generateFrame(_channelId);
+        AMQFrame declare = body.generateFrame(getChannelId());
 
-        _protocolHandler.writeFrame(declare);
+        getProtocolHandler().writeFrame(declare);
     }
 
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
                      UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
                      boolean immediate) throws JMSException
     {
-        BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
+        BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(getSession().getTicket(),
                                                                                         destination.getExchangeName(),
                                                                                         destination.getRoutingKey(),
                                                                                         mandatory,
                                                                                         immediate);
 
-        AMQFrame publishFrame = body.generateFrame(_channelId);
+        AMQFrame publishFrame = body.generateFrame(getChannelId());
 
         message.prepareForSending();
         ByteBuffer payload = message.getData();
         AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
         BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
 
-        contentHeaderProperties.setUserId(_userID);
+        contentHeaderProperties.setUserId(getUserID());
 
         //Set the JMS_QPID_DESTTYPE for 0-8/9 messages
         int type;
@@ -110,7 +112,7 @@ public class BasicMessageProducer_0_8 ex
         //Set JMS_QPID_DESTTYPE
         delegate.getContentHeaderProperties().getHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
 
-        if (!_disableTimestamps)
+        if (!isDisableTimestamps())
         {
             final long currentTime = System.currentTimeMillis();
             contentHeaderProperties.setTimestamp(currentTime);
@@ -134,12 +136,12 @@ public class BasicMessageProducer_0_8 ex
 
         if (payload != null)
         {
-            createContentBodies(payload, frames, 2, _channelId);
+            createContentBodies(payload, frames, 2, getChannelId());
         }
 
-        if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
+        if ((contentBodyFrameCount != 0) && getLogger().isDebugEnabled())
         {
-            _logger.debug("Sending content body frames to " + destination);
+            getLogger().debug("Sending content body frames to " + destination);
         }
 
 
@@ -147,11 +149,11 @@ public class BasicMessageProducer_0_8 ex
         int classIfForBasic = getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
 
         AMQFrame contentHeaderFrame =
-            ContentHeaderBody.createAMQFrame(_channelId,
+            ContentHeaderBody.createAMQFrame(getChannelId(),
                                              classIfForBasic, 0, contentHeaderProperties, size);
-        if (_logger.isDebugEnabled())
+        if (getLogger().isDebugEnabled())
         {
-            _logger.debug("Sending content header frame to " + destination);
+            getLogger().debug("Sending content header frame to " + destination);
         }
 
         frames[0] = publishFrame;
@@ -160,7 +162,7 @@ public class BasicMessageProducer_0_8 ex
 
         try
         {
-            _session.checkFlowControl();
+            getSession().checkFlowControl();
         }
         catch (InterruptedException e)
         {
@@ -170,7 +172,7 @@ public class BasicMessageProducer_0_8 ex
             throw jmse;
         }
 
-        _protocolHandler.writeFrame(compositeFrame);
+        getProtocolHandler().writeFrame(compositeFrame);
     }
 
     /**
@@ -194,7 +196,7 @@ public class BasicMessageProducer_0_8 ex
         else
         {
 
-            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+            final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
             long remaining = payload.remaining();
             for (int i = offset; i < frames.length; i++)
             {
@@ -224,7 +226,7 @@ public class BasicMessageProducer_0_8 ex
         else
         {
             int dataLength = payload.remaining();
-            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+            final long framePayloadMax = getSession().getAMQConnection().getMaximumFrameSize() - 1;
             int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
             frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
         }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/Closeable.java Sat Mar 10 19:22:10 2012
@@ -22,7 +22,6 @@ package org.apache.qpid.client;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
-
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -49,14 +48,14 @@ public abstract class Closeable
      * We use an atomic boolean so that we do not have to synchronized access to this flag. Synchronizing access to this
      * flag would mean have a synchronized block in every method.
      */
-    protected final AtomicBoolean _closed = new AtomicBoolean(false);
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
 
     /**
      * Are we in the process of closing. We have this distinction so we can
      * still signal we are in the process of closing so other objects can tell
      * the difference and tidy up.
      */
-    protected final AtomicBoolean _closing = new AtomicBoolean(false);
+    private final AtomicBoolean _closing = new AtomicBoolean(false);
 
     /**
      * Checks if this is closed, and raises a JMSException if it is.
@@ -91,6 +90,15 @@ public abstract class Closeable
         return _closing.get();
     }
 
+    protected boolean setClosed()
+    {
+        return _closed.getAndSet(true);
+    }
+
+    protected void setClosing(boolean closing)
+    {
+        _closing.set(closing);
+    }
 
     /**
      * Closes this object.

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/CustomJMSXProperty.java Sat Mar 10 19:22:10 2012
@@ -20,13 +20,13 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.framing.AMQShortString;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
 
-import org.apache.qpid.framing.AMQShortString;
-
 public enum CustomJMSXProperty
 {
     JMS_AMQP_NULL,

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/MessageConsumerPair.java Sat Mar 10 19:22:10 2012
@@ -22,8 +22,8 @@ package org.apache.qpid.client;
 
 public class MessageConsumerPair
 {
-    BasicMessageConsumer _consumer;
-    Object _item;
+    private BasicMessageConsumer _consumer;
+    private Object _item;
 
     public MessageConsumerPair(BasicMessageConsumer consumer, Object item)
     {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java?rev=1299257&r1=1299256&r2=1299257&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/client/src/main/java/org/apache/qpid/client/QpidConnectionMetaData.java Sat Mar 10 19:22:10 2012
@@ -20,12 +20,11 @@
  */
 package org.apache.qpid.client;
 
-import java.util.Enumeration;
+import org.apache.qpid.common.QpidProperties;
 
 import javax.jms.ConnectionMetaData;
 import javax.jms.JMSException;
-
-import org.apache.qpid.common.QpidProperties;
+import java.util.Enumeration;
 
 public class QpidConnectionMetaData implements ConnectionMetaData
 {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org