You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/18 10:11:14 UTC

svn commit: r488163 [2/4] - in /incubator/qpid/branches/new_persistence/java: ./ broker/ broker/bin/ broker/etc/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/ser...

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Dec 18 01:10:59 2006
@@ -36,7 +36,10 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 
-import javax.management.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.Notification;
 import javax.management.monitor.MonitorNotification;
 import javax.management.openmbean.*;
 import javax.security.sasl.SaslServer;
@@ -97,66 +100,35 @@
      * augment the ManagedConnection interface and add the appropriate implementation here.
      */
     @MBeanDescription("Management Bean for an AMQ Broker Connection")
-    private final class ManagedAMQProtocolSession extends AMQManagedObject
-                                                  implements ManagedConnection
+    private final class ManagedAMQProtocolSession extends AMQManagedObject implements ManagedConnection
     {
         private String _name = null;
-        /**
-         * Represents the channel attributes sent with channel data.
-         */
-        private String[] _channelAtttibuteNames = { "ChannelId",
-                                                    "Transactional",
-                                                    "DefaultQueue",
-                                                    "UnacknowledgedMessageCount"};
-        private String[] _channelAttributeDescriptions = { "Channel Identifier",
-                                                           "is Channel Transactional?",
-                                                           "Default Queue Name",
-                                                           "Unacknowledged Message Count"};
-        private OpenType[] _channelAttributeTypes = { SimpleType.INTEGER,
-                                                      SimpleType.BOOLEAN,
-                                                      SimpleType.STRING,
-                                                      SimpleType.INTEGER};
-
-        private String[] _indexNames = { "ChannelId" };  //Channels in the list will be indexed according to channelId.
-        private CompositeType _channelType = null;       // represents the data type for channel data
-        private TabularType  _channelsType = null;       // Datatype for list of channelsType
+        //openmbean data types for representing the channel attributes
+        private String[] _channelAtttibuteNames = { "Channel Id", "Transactional", "Default Queue", "Unacknowledged Message Count"};
+        private String[] _indexNames = {_channelAtttibuteNames[0]};
+        private OpenType[] _channelAttributeTypes = {SimpleType.INTEGER, SimpleType.BOOLEAN, SimpleType.STRING, SimpleType.INTEGER};
+        private CompositeType _channelType = null;      // represents the data type for channel data
+        private TabularType _channelsType = null;       // Data type for list of channels type
         private TabularDataSupport _channelsList = null;
 
         @MBeanConstructor("Creates an MBean exposing an AMQ Broker Connection")
-        public ManagedAMQProtocolSession() throws NotCompliantMBeanException
+        public ManagedAMQProtocolSession() throws JMException
         {
             super(ManagedConnection.class, ManagedConnection.TYPE);
             init();
         }
 
         /**
-         * initialises the CompositeTypes and TabularType attributes.
+         * initialises the openmbean data types
          */
-        private void init()
+        private void init() throws OpenDataException
         {
             String remote = getRemoteAddress();
             remote = "anonymous".equals(remote) ? remote + hashCode() : remote;
             _name = jmxEncode(new StringBuffer(remote), 0).toString();
-
-            try
-            {
-                _channelType = new CompositeType("channel",
-                                              "Channel Details",
-                                              _channelAtttibuteNames,
-                                              _channelAttributeDescriptions,
-                                              _channelAttributeTypes);
-
-                _channelsType = new TabularType("channelsType",
-                                       "List of available channels",
-                                       _channelType,
-                                       _indexNames);
-            }
-            catch(OpenDataException ex)
-            {
-                // It should never occur.
-                _logger.error("OpenDataTypes could not be created.", ex);
-                throw new RuntimeException(ex);
-            }
+            _channelType = new CompositeType("Channel", "Channel Details", _channelAtttibuteNames,
+                                             _channelAtttibuteNames, _channelAttributeTypes);
+            _channelsType = new TabularType("Channels", "Channels", _channelType, _indexNames);
         }
 
         public Date getLastIoTime()
@@ -179,12 +151,12 @@
             return _minaProtocolSession.getReadBytes();
         }
 
-        public Long getMaximumNumberOfAllowedChannels()
+        public Long getMaximumNumberOfChannels()
         {
             return _maxNoOfChannels;
         }
 
-        public void setMaximumNumberOfAllowedChannels(Long value)
+        public void setMaximumNumberOfChannels(Long value)
         {
             _maxNoOfChannels = value;
         }
@@ -239,30 +211,25 @@
          * @return  list of channels in tabular form.
          * @throws OpenDataException
          */
-        public TabularData getChannels() throws OpenDataException
+        public TabularData channels() throws OpenDataException
         {
             _channelsList = new TabularDataSupport(_channelsType);
 
             for (Map.Entry<Integer, AMQChannel> entry : _channelMap.entrySet())
             {
                 AMQChannel channel = entry.getValue();
-                Object[] itemValues = {channel.getChannelId(),
-                                       channel.isTransactional(),
+                Object[] itemValues = {channel.getChannelId(), channel.isTransactional(),
                                        (channel.getDefaultQueue() != null) ? channel.getDefaultQueue().getName() : null,
                                        channel.getUnacknowledgedMessageMap().size()};
 
-                CompositeData channelData = new CompositeDataSupport(_channelType,
-                                                            _channelAtttibuteNames,
-                                                            itemValues);
-
+                CompositeData channelData = new CompositeDataSupport(_channelType, _channelAtttibuteNames, itemValues);
                 _channelsList.put(channelData);
             }
 
             return _channelsList;
         }
-
-        public void closeChannel(int id)
-            throws Exception
+        
+        public void closeChannel(int id) throws Exception
         {
             try
             {
@@ -274,8 +241,7 @@
             }
         }
 
-        public void closeConnection()
-            throws Exception
+        public void closeConnection() throws Exception
         {
             try
             {
@@ -290,13 +256,10 @@
         @Override
         public MBeanNotificationInfo[] getNotificationInfo()
         {
-            String[] notificationTypes = new String[]
-                                {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+            String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
             String name = MonitorNotification.class.getName();
-            String description = "An attribute of this MBean has reached threshold value";
-            MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
-                                                                    name,
-                                                                    description);
+            String description = "Channel count has reached threshold value";
+            MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
 
             return new MBeanNotificationInfo[] {info1};
         }
@@ -304,14 +267,11 @@
         private void checkForNotification()
         {
             int channelsCount = _channelMap.size();
-            if (channelsCount >= getMaximumNumberOfAllowedChannels())
+            if (channelsCount >= getMaximumNumberOfChannels())
             {
-                Notification n = new Notification(
-                        MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
-                        this,
-                        ++_notificationSequenceNumber,
-                        System.currentTimeMillis(),
-                        "ChannelsCount = " + channelsCount + ", ChannelsCount has reached the threshold value");
+                Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+                                     ++_notificationSequenceNumber, System.currentTimeMillis(),
+                                     "Channel count (" + channelsCount + ") has reached the threshold value");
 
                 _broadcaster.sendNotification(n);
             }
@@ -347,10 +307,10 @@
         {
             return new ManagedAMQProtocolSession();
         }
-        catch(NotCompliantMBeanException ex)
+        catch(JMException ex)
         {
-            _logger.error("AMQProtocolSession MBean creation has failed.", ex);
-            throw new AMQException("AMQProtocolSession MBean creation has failed.", ex);
+            _logger.error("AMQProtocolSession MBean creation has failed ", ex);
+            throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
         }
     }
 
@@ -389,6 +349,11 @@
                 int i = pv.length - 1;
                 _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
                 // TODO: Close connection (but how to wait until message is sent?)
+                // ritchiem 2006-12-04 will this not do?
+//                WriteFuture future = _minaProtocolSession.write(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR]));
+//                future.join();
+//                close connection
+
             }
         }
         else

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Mon Dec 18 01:10:59 2006
@@ -41,83 +41,57 @@
     static final String TYPE = "Connection";
 
     /**
-     * channel details of all the channels opened for this connection.
-     * @return general channel details
-     * @throws IOException
-     * @throws JMException
+     * Tells the remote address of this connection.
+     * @return  remote address
      */
-    @MBeanAttribute(name="Channels",
-                         description="channel details of all the channels opened for this connection")
-    TabularData getChannels() throws IOException, JMException;
+    @MBeanAttribute(name="RemoteAddress", description=TYPE + " Address")
+    String getRemoteAddress();
 
     /**
      * Tells the last time, the IO operation was done.
      * @return last IO time.
      */
-    @MBeanAttribute(name="LastIOTime",
-                         description="The last time, the IO operation was done")
+    @MBeanAttribute(name="LastIOTime", description="The last time, the IO operation was done")
     Date getLastIoTime();
 
     /**
-     * Tells the remote address of this connection.
-     * @return  remote address
-     */
-    @MBeanAttribute(name="RemoteAddress",
-                         description="The remote address of this connection")
-    String getRemoteAddress();
-
-    /**
      * Tells the total number of bytes written till now.
      * @return number of bytes written.
      */
-    @MBeanAttribute(name="WrittenBytes",
-                         description="The total number of bytes written till now")
+    @MBeanAttribute(name="WrittenBytes", description="The total number of bytes written till now")
     Long getWrittenBytes();
 
     /**
      * Tells the total number of bytes read till now.
      * @return number of bytes read.
      */
-    @MBeanAttribute(name="ReadBytes",
-                         description="The total number of bytes read till now")
+    @MBeanAttribute(name="ReadBytes", description="The total number of bytes read till now")
     Long getReadBytes();
 
     /**
-     * Tells the maximum number of channels that can be opened using
-     * this connection.  This is useful in setting notifications or
+     * Threshold high value for no of channels.  This is useful in setting notifications or
      * taking required action is there are more channels being created.
-     * @return maximum number of channels allowed to be created.
+     * @return threshold limit for no of channels
      */
-    Long getMaximumNumberOfAllowedChannels();
+    Long getMaximumNumberOfChannels();
 
     /**
-     * Sets the maximum number of channels allowed to be created using
-     * this connection.
+     * Sets the threshold high value for number of channels for a connection
      * @param value
      */
-    @MBeanAttribute(name="MaximumNumberOfAllowedChannels",
-                             description="The maximum number of channels that can be opened using this connection")    
-    void setMaximumNumberOfAllowedChannels(Long value);
+    @MBeanAttribute(name="MaximumNumberOfChannels", description="The threshold high value for number of channels for this connection")
+    void setMaximumNumberOfChannels(Long value);
 
     //********** Operations *****************//
 
     /**
-     * Closes all the related channels and unregisters this connection from managed objects.
-     */
-    @MBeanOperation(name="closeConnection",
-                         description="Closes this connection and all related channels",
-                         impact= MBeanOperationInfo.ACTION)
-    void closeConnection() throws Exception;
-
-    /**
-     * Unsubscribes the consumers and unregisters the channel from managed objects.
+     * channel details of all the channels opened for this connection.
+     * @return general channel details
+     * @throws IOException
+     * @throws JMException
      */
-    @MBeanOperation(name="closeChannel",
-                         description="Closes the channel with given channeld and" +
-                                     "connected consumers will be unsubscribed",
-                         impact= MBeanOperationInfo.ACTION)
-    void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
-        throws Exception;
+    @MBeanOperation(name="channels", description="Channel details for this connection")
+    TabularData channels() throws IOException, JMException;
 
     /**
      * Commits the transactions if the channel is transactional.
@@ -125,8 +99,8 @@
      * @throws JMException
      */
     @MBeanOperation(name="commitTransaction",
-                         description="Commits the transactions for given channelID, if the channel is transactional",
-                         impact= MBeanOperationInfo.ACTION)
+                    description="Commits the transactions for given channel Id, if the channel is transactional",
+                    impact= MBeanOperationInfo.ACTION)
     void commitTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
 
     /**
@@ -135,7 +109,24 @@
      * @throws JMException
      */
     @MBeanOperation(name="rollbackTransactions",
-                         description="Rollsback the transactions for given channelId, if the channel is transactional",
-                         impact= MBeanOperationInfo.ACTION)
+                    description="Rollsback the transactions for given channel Id, if the channel is transactional",
+                    impact= MBeanOperationInfo.ACTION)
     void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
+
+    /**
+     * Unsubscribes the consumers and unregisters the channel from managed objects.
+     */
+    @MBeanOperation(name="closeChannel",
+                    description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
+                    impact= MBeanOperationInfo.ACTION)
+    void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
+        throws Exception;
+
+    /**
+     * Closes all the related channels and unregisters this connection from managed objects.
+     */
+    @MBeanOperation(name="closeConnection",
+                    description="Closes this connection and all related channels",
+                    impact= MBeanOperationInfo.ACTION)
+    void closeConnection() throws Exception;
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Dec 18 01:10:59 2006
@@ -23,6 +23,7 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.*;
@@ -91,19 +92,19 @@
     private final AMQQueueMBean _managedObject;
 
     /**
-     * max allowed size of a single message(in KBytes).
+     * max allowed size(KB) of a single message
      */
-    private long _maxAllowedMessageSize = 10000;  // 10 MB
+    private long _maxMessageSize = 10000;
 
     /**
      * max allowed number of messages on a queue.
      */
-    private Integer _maxAllowedMessageCount = 10000;
+    private Integer _maxMessageCount = 10000;
 
     /**
-     * max allowed size in  KBytes for all the messages combined together in a queue.
+     * max queue depth(KB) for the queue
      */
-    private long _queueDepth = 10000000;          //   10 GB
+    private long _maxQueueDepth = 10000000;
 
     /**
      * total messages received by the queue since startup.
@@ -123,83 +124,45 @@
     private final class AMQQueueMBean extends AMQManagedObject implements ManagedQueue
     {
         private String _queueName = null;
+        // OpenMBean data types for viewMessages method
+        private String[] _msgAttributeNames = {"Message Id", "Header", "Size(bytes)", "Redelivered"};
+        private String[] _msgAttributeIndex = {_msgAttributeNames[0]};
+        private OpenType[] _msgAttributeTypes = new OpenType[4]; // AMQ message attribute types.
+        private CompositeType _messageDataType = null;           // Composite type for representing AMQ Message data.
+        private TabularType _messagelistDataType = null;         // Datatype for representing AMQ messages list.
+
+        // OpenMBean data types for viewMessageContent method
+        private CompositeType _msgContentType = null;
+        private String[]      _msgContentAttributes = {"Message Id", "MimeType", "Encoding", "Content"};
+        private OpenType[]    _msgContentAttributeTypes = new OpenType[4];
 
-        // AMQ message attribute names
-        private String[] _msgAttributeNames = {"MessageId",
-                                               "Header",
-                                               "Size",
-                                               "Redelivered"
-        };
-        // AMQ Message attribute descriptions.
-        private String[] _msgAttributeDescriptions = {"Message Id",
-                                                      "Header",
-                                                      "Message size in bytes",
-                                                      "Redelivered"
-        };
-
-        private OpenType[] _msgAttributeTypes = new OpenType[4];  // AMQ message attribute types.
-        private String[] _msgAttributeIndex = {"MessageId"};    // Messages will be indexed according to the messageId.
-        private CompositeType _messageDataType = null;            // Composite type for representing AMQ Message data.
-        private TabularType _messagelistDataType = null;        // Datatype for representing AMQ messages list.
-
-
-        private CompositeType _msgContentType = null;    // For message content
-        private String[] _msgContentAttributes = {"MessageId",
-                                                  "MimeType",
-                                                  "Encoding",
-                                                  "Content"
-        };
-        private String[] _msgContentDescriptions = {"Message Id",
-                                                    "MimeType",
-                                                    "Encoding",
-                                                    "Message content"
-        };
-        private OpenType[] _msgContentAttributeTypes = new OpenType[4];
-
-
-        @MBeanConstructor("Creates an MBean exposing an AMQQueue.")
-        public AMQQueueMBean() throws NotCompliantMBeanException
+        @MBeanConstructor("Creates an MBean exposing an AMQQueue")
+        public AMQQueueMBean() throws JMException
         {
             super(ManagedQueue.class, ManagedQueue.TYPE);
             init();
         }
 
-        private void init()
+        /**
+         * initialises the openmbean data types
+         */
+        private void init() throws OpenDataException
         {
             _queueName = jmxEncode(new StringBuffer(_name), 0).toString();
-            try
-            {
-                _msgContentAttributeTypes[0] = SimpleType.LONG;                    // For message id
-                _msgContentAttributeTypes[1] = SimpleType.STRING;                  // For MimeType
-                _msgContentAttributeTypes[2] = SimpleType.STRING;                  // For Encoding
-                _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE);  // For message content
-                _msgContentType = new CompositeType("MessageContent",
-                                                    "AMQ Message Content",
-                                                    _msgContentAttributes,
-                                                    _msgContentDescriptions,
-                                                    _msgContentAttributeTypes);
-
-
-                _msgAttributeTypes[0] = SimpleType.LONG;                      // For message id
-                _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING);  // For header attributes
-                _msgAttributeTypes[2] = SimpleType.LONG;                      // For size
-                _msgAttributeTypes[3] = SimpleType.BOOLEAN;                   // For redelivered
-
-                _messageDataType = new CompositeType("Message",
-                                                     "AMQ Message",
-                                                     _msgAttributeNames,
-                                                     _msgAttributeDescriptions,
-                                                     _msgAttributeTypes);
-                _messagelistDataType = new TabularType("Messages",
-                                                       "List of messages",
-                                                       _messageDataType,
-                                                       _msgAttributeIndex);
-            }
-            catch (OpenDataException ex)
-            {
-                _logger.error("OpenDataTypes could not be created.", ex);
-                throw new RuntimeException(ex);
-            }
+            _msgContentAttributeTypes[0] = SimpleType.LONG;                    // For message id
+            _msgContentAttributeTypes[1] = SimpleType.STRING;                  // For MimeType
+            _msgContentAttributeTypes[2] = SimpleType.STRING;                  // For Encoding
+            _msgContentAttributeTypes[3] = new ArrayType(1, SimpleType.BYTE);  // For message content
+            _msgContentType = new CompositeType("Message Content", "AMQ Message Content", _msgContentAttributes,
+                                                 _msgContentAttributes, _msgContentAttributeTypes);
+
+            _msgAttributeTypes[0] = SimpleType.LONG;                      // For message id
+            _msgAttributeTypes[1] = new ArrayType(1, SimpleType.STRING);  // For header attributes
+            _msgAttributeTypes[2] = SimpleType.LONG;                      // For size
+            _msgAttributeTypes[3] = SimpleType.BOOLEAN;                   // For redelivered
+
+            _messageDataType = new CompositeType("Message","AMQ Message", _msgAttributeNames, _msgAttributeNames, _msgAttributeTypes);
+            _messagelistDataType = new TabularType("Messages", "List of messages", _messageDataType, _msgAttributeIndex);
         }
 
         public String getObjectInstanceName()
@@ -234,12 +197,12 @@
 
         public Long getMaximumMessageSize()
         {
-            return _maxAllowedMessageSize;
+            return _maxMessageSize;
         }
 
         public void setMaximumMessageSize(Long value)
         {
-            _maxAllowedMessageSize = value;
+            _maxMessageSize = value;
         }
 
         public Integer getConsumerCount()
@@ -259,27 +222,29 @@
 
         public Integer getMaximumMessageCount()
         {
-            return _maxAllowedMessageCount;
+            return _maxMessageCount;
         }
 
         public void setMaximumMessageCount(Integer value)
         {
-            _maxAllowedMessageCount = value;
+            _maxMessageCount = value;
         }
 
-        public Long getQueueDepth()
+        public Long getMaximumQueueDepth()
         {
-            return _queueDepth;
+            return _maxQueueDepth;
         }
 
         // Sets the queue depth, the max queue size
-        public void setQueueDepth(Long value)
+        public void setMaximumQueueDepth(Long value)
         {
-           _queueDepth = value;
+            _maxQueueDepth = value;
         }
 
-        // Returns the size of messages in the queue
-        public Long getQueueSize() throws AMQException
+        /**
+         * returns the size of messages(KB) in the queue.
+         */
+        public Long getQueueDepth()
         {
             /* TODO: this must return a maintained count not
              * iterate through all messages
@@ -291,17 +256,19 @@
                 return 0l;
             }
 
-            long queueSize = 0;
+            long queueDepth = 0;
             for (AMQMessage message : list)
             {
-                queueSize = queueSize + getMessageSize(message);
+                queueDepth = queueDepth + getMessageSize(message);
             }
-            return new Long(Math.round(queueSize / 100));
+            return (long)Math.round(queueDepth / 1000);
             */
         }
         // Operations
 
-        // calculates the size of an AMQMessage
+        /**
+         * returns size of message in bytes
+         */
         private long getMessageSize(AMQMessage msg) throws AMQException
         {
             if (msg == null)
@@ -312,41 +279,40 @@
             return msg.getContentHeaderBody().bodySize;
         }
 
-        // Checks if there is any notification to be send to the listeners
+        /**
+         * Checks if there is any notification to be send to the listeners
+         */
         private void checkForNotification(AMQMessage msg) throws AMQException
         {
-            // Check for message count
+            // Check for threshold message count
             Integer msgCount = getMessageCount();
             if (msgCount >= getMaximumMessageCount())
             {
-                notifyClients("MessageCount = " + msgCount + ", Queue has reached its size limit and is now full.");
+                notifyClients("Message count(" + msgCount + ") has reached or exceeded the threshold high value");
             }
 
-            // Check for received message size
+            // Check for threshold message size
             long messageSize = getMessageSize(msg);
-            if (messageSize >= getMaximumMessageSize())
+            if (messageSize >= _maxMessageSize)
             {
-                notifyClients("MessageSize = " + messageSize + ", Message size (MessageID=" + msg.getMessageId() +
-                              ")is higher than the threshold value");
+                notifyClients("Message size(ID=" + msg.getMessageId() + ", size=" + messageSize + " bytes) is higher than the threshold value");
             }
 
-            // Check for queue size in bytes
-            long queueSize = getQueueSize();
-            if (queueSize >= getQueueDepth())
+            // Check for threshold queue depth in bytes
+            long queueDepth = getQueueDepth();
+            if (queueDepth >= _maxQueueDepth)
             {
-                notifyClients("QueueSize = " + queueSize + ", Queue size has reached the threshold value");
+                notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");
             }
         }
 
-        // Send the notification to the listeners
+        /**
+         * Sends the notification to the listeners
+         */
         private void notifyClients(String notificationMsg)
         {
-            Notification n = new Notification(
-                    MonitorNotification.THRESHOLD_VALUE_EXCEEDED,
-                    this,
-                    ++_notificationSequenceNumber,
-                    System.currentTimeMillis(),
-                    notificationMsg);
+            Notification n = new Notification(MonitorNotification.THRESHOLD_VALUE_EXCEEDED, this,
+                                 ++_notificationSequenceNumber, System.currentTimeMillis(), notificationMsg);
 
             _broadcaster.sendNotification(n);
         }
@@ -375,10 +341,12 @@
             }
         }
 
-        public CompositeData viewMessageContent(long msgId) throws JMException, AMQException
+        /**
+         * returns message content as byte array and related attributes for the given message id.
+         */
+        public CompositeData viewMessageContent(long msgId) throws JMException
         {
             List<AMQMessage> list = _deliveryMgr.getMessages();
-            CompositeData messageContent = null;
             AMQMessage msg = null;
             for (AMQMessage message : list)
             {
@@ -391,70 +359,65 @@
 
             if (msg != null)
             {
-                // get message content
-                Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
-                List<Byte> msgContent = new ArrayList<Byte>();
-                while (cBodies.hasNext())
+                try
                 {
-                    ContentBody body = cBodies.next();
-                    if (body.getSize() != 0)
+                    // get message content
+                    Iterator<ContentBody> cBodies = msg.getContentBodyIterator();
+                    List<Byte> msgContent = new ArrayList<Byte>();
+                    while (cBodies.hasNext())
                     {
-                        ByteBuffer slice = body.payload.slice();
-                        for (int j = 0; j < slice.limit(); j++)
+                        ContentBody body = cBodies.next();
+                        if (body.getSize() != 0)
                         {
-                            msgContent.add(slice.get());
+                            ByteBuffer slice = body.payload.slice();
+                            for (int j = 0; j < slice.limit(); j++)
+                            {
+                                msgContent.add(slice.get());
+                            }
                         }
                     }
-                }
 
-                // Create header attributes list
-                BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
-                String mimeType = headerProperties.getContentType();
-                String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+                    // Create header attributes list
+                    BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)msg.getContentHeaderBody().properties;
+                    String mimeType = headerProperties.getContentType();
+                    String encoding = headerProperties.getEncoding() == null ? "" : headerProperties.getEncoding();
+                    Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
 
-                Object[] itemValues = {msgId, mimeType, encoding, msgContent.toArray(new Byte[0])};
-                messageContent = new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+                    return new CompositeDataSupport(_msgContentType, _msgContentAttributes, itemValues);
+                }
+                catch (AMQException e)
+                {
+                    throw new JMException(e.getMessage());
+                }
             }
             else
             {
-                throw new JMException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
+                throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
             }
-
-            return messageContent;
         }
 
         /**
-         * Returns the messages stored in this queue in tabular form.
-         *
-         * @param beginIndex
-         * @param endIndex
-         * @return AMQ messages in tabular form.
-         * @throws JMException
+         * Returns the header contents of the messages stored in this queue in tabular form.
          */
         public TabularData viewMessages(int beginIndex, int endIndex) throws JMException, AMQException
         {
             if ((beginIndex > endIndex) || (beginIndex < 1))
             {
-                throw new JMException("FromIndex = " + beginIndex + ", ToIndex = " + endIndex +
-                                      "\nFromIndex should be greater than 0 and less than ToIndex");
+                throw new JMException("From Index = " + beginIndex + ", To Index = " + endIndex +
+                                      "\nFrom Index should be greater than 0 and less than To Index");
             }
 
             List<AMQMessage> list = _deliveryMgr.getMessages();
             TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
 
-            if (beginIndex > list.size())
-            {
-                return _messageList;
-            }
-            endIndex = endIndex < list.size() ? endIndex : list.size();
-
-            for (int i = beginIndex; i <= endIndex; i++)
+            // Create the tabular list of message header contents
+            for (int i = beginIndex; i <= endIndex && i <=  list.size(); i++)
             {
                 AMQMessage msg = list.get(i - 1);
-                long size = msg.getContentHeaderBody().bodySize;
-
+                ContentHeaderBody headerBody = msg.getContentHeaderBody();
+                long size = headerBody.bodySize;
                 // Create header attributes list
-                BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties) msg.getContentHeaderBody().properties;
+                BasicContentHeaderProperties headerProperties = (BasicContentHeaderProperties)headerBody.properties;
                 List<String> headerAttribsList = new ArrayList<String>();
                 headerAttribsList.add("App Id=" + headerProperties.getAppId());
                 headerAttribsList.add("MimeType=" + headerProperties.getContentType());
@@ -462,13 +425,9 @@
                 headerAttribsList.add("Encoding=" + headerProperties.getEncoding());
                 headerAttribsList.add(headerProperties.toString());
 
-                Object[] itemValues = {msg.getMessageId(),
-                                       headerAttribsList.toArray(new String[0]),
-                                       size, msg.isRedelivered()};
-
-                CompositeData messageData = new CompositeDataSupport(_messageDataType,
-                                                                     _msgAttributeNames,
-                                                                     itemValues);
+                Object[] itemValues = {msg.getMessageId(), headerAttribsList.toArray(new String[0]),
+                                       headerBody.bodySize, msg.isRedelivered()};
+                CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues);
                 _messageList.put(messageData);
             }
 
@@ -476,20 +435,15 @@
         }
 
         /**
-         * Creates all the notifications this MBean can send.
-         *
-         * @return Notifications broadcasted by this MBean.
+         * returns Notifications sent by this MBean.
          */
         @Override
         public MBeanNotificationInfo[] getNotificationInfo()
         {
-            String[] notificationTypes = new String[]
-                    {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
+            String[] notificationTypes = new String[] {MonitorNotification.THRESHOLD_VALUE_EXCEEDED};
             String name = MonitorNotification.class.getName();
-            String description = "An attribute of this MBean has reached threshold value";
-            MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes,
-                                                                    name,
-                                                                    description);
+            String description = "Either Message count or Queue depth or Message size has reached threshold high value";
+            MBeanNotificationInfo info1 = new MBeanNotificationInfo(notificationTypes, name, description);
 
             return new MBeanNotificationInfo[]{info1};
         }
@@ -591,9 +545,9 @@
         {
             return new AMQQueueMBean();
         }
-        catch (NotCompliantMBeanException ex)
+        catch (JMException ex)
         {
-            throw new AMQException("AMQQueue MBean creation has failed.", ex);
+            throw new AMQException("AMQQueue MBean creation has failed ", ex);
         }
     }
 

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/ManagedQueue.java Mon Dec 18 01:10:59 2006
@@ -46,16 +46,48 @@
      * @return the name of the managedQueue.
      * @throws IOException
      */
-    @MBeanAttribute(name="Name", description = "Name of the " + TYPE)
+    @MBeanAttribute(name="Name", description = TYPE + " Name")
     String getName() throws IOException;
 
     /**
-     * Tells whether this ManagedQueue is durable or not.
-     * @return true if this ManagedQueue is a durable queue.
+     * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
+     * @return number of undelivered message in the Queue.
      * @throws IOException
      */
-    @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
-    boolean isDurable() throws IOException;
+    @MBeanAttribute(name="MessageCount", description = "Total number of undelivered messages on the queue")
+    Integer getMessageCount() throws IOException;
+
+    /**
+     * Tells the total number of messages receieved by the queue since startup.
+     * @return total number of messages received.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="ReceivedMessageCount", description="The total number of messages receieved by the queue since startup")
+    Long getReceivedMessageCount() throws IOException;
+
+    /**
+     * Size of messages in the queue
+     * @return
+     * @throws IOException
+     */
+    @MBeanAttribute(name="QueueDepth", description="Size of messages(KB) in the queue")
+    Long getQueueDepth() throws IOException;
+
+    /**
+     *  Returns the total number of active subscribers to the queue.
+     * @return the number of active subscribers
+     * @throws IOException
+     */
+    @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
+    Integer getActiveConsumerCount() throws IOException;
+
+    /**
+     * Returns the total number of subscribers to the queue.
+     * @return the number of subscribers.
+     * @throws IOException
+     */
+    @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
+    Integer getConsumerCount() throws IOException;
 
     /**
      * Tells the Owner of the ManagedQueue.
@@ -66,21 +98,20 @@
     String getOwner() throws IOException;
 
     /**
-     * Tells if the ManagedQueue is set to AutoDelete.
-     * @return  true if the ManagedQueue is set to AutoDelete.
+     * Tells whether this ManagedQueue is durable or not.
+     * @return true if this ManagedQueue is a durable queue.
      * @throws IOException
      */
-    @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
-    boolean isAutoDelete() throws IOException;
+    @MBeanAttribute(name="Durable", description = "true if the AMQQueue is durable")
+    boolean isDurable() throws IOException;
 
     /**
-     * Total number of messages on the queue, which are yet to be delivered to the consumer(s).
-     * @return number of undelivered message in the Queue.
+     * Tells if the ManagedQueue is set to AutoDelete.
+     * @return  true if the ManagedQueue is set to AutoDelete.
      * @throws IOException
      */
-    @MBeanAttribute(name="MessageCount",
-                         description = "Total number of undelivered messages on the queue")
-    Integer getMessageCount() throws IOException;
+    @MBeanAttribute(name="AutoDelete", description = "true if the AMQQueue is AutoDelete")
+    boolean isAutoDelete() throws IOException;
 
     /**
      * Returns the maximum size of a message (in kbytes) allowed to be accepted by the
@@ -99,36 +130,10 @@
      * @param size  maximum size of message.
      * @throws IOException
      */
-    @MBeanAttribute(name="MaximumMessageSize",
-                         description="Maximum size(KB) of a message allowed for this Queue")
+    @MBeanAttribute(name="MaximumMessageSize", description="Threshold high value(KB) for a message size")
     void setMaximumMessageSize(Long size) throws IOException;
 
     /**
-     * Returns the total number of subscribers to the queue.
-     * @return the number of subscribers.
-     * @throws IOException
-     */
-    @MBeanAttribute(name="ConsumerCount", description="The total number of subscribers to the queue")
-    Integer getConsumerCount() throws IOException;
-
-    /**
-     *  Returns the total number of active subscribers to the queue.
-     * @return the number of active subscribers
-     * @throws IOException
-     */
-    @MBeanAttribute(name="ActiveConsumerCount", description="The total number of active subscribers to the queue")
-    Integer getActiveConsumerCount() throws IOException;
-
-    /**
-     * Tells the total number of messages receieved by the queue since startup.
-     * @return total number of messages received.
-     * @throws IOException
-     */
-    @MBeanAttribute(name="ReceivedMessageCount",
-                         description="The total number of messages receieved by the queue since startup")
-    Long getReceivedMessageCount() throws IOException;
-
-    /**
      * Tells the maximum number of messages that can be stored in the queue.
      * This is useful in setting the notifications or taking required
      * action is the number of message increase this limit.
@@ -142,27 +147,16 @@
      * @param value  the maximum number of messages allowed to be stored in the queue.
      * @throws IOException
      */
-    @MBeanAttribute(name="MaximumMessageCount",
-                         description="The maximum number of messages allowed to be stored in the queue")
+    @MBeanAttribute(name="MaximumMessageCount", description="Threshold high value for number of undelivered messages in the queue")
     void setMaximumMessageCount(Integer value) throws IOException;
 
     /**
-     * Size of messages in the queue
-     * @return
-     * @throws IOException
-     */
-    @MBeanAttribute(name="QueueSize", description="Size of messages(KB) in the queue")
-    Long getQueueSize() throws IOException, AMQException;
-
-    /**
-     * Tells the maximum size of all the messages combined together,
-     * that can be stored in the queue. This is useful for setting notifications
-     * or taking required action if the size of messages stored in the queue
-     * increases over this limit.
-     * @return maximum size of the all the messages allowed for the queue.
+     * This is useful for setting notifications or taking required action if the size of messages
+     * stored in the queue increases over this limit.
+     * @return threshold high value for Queue Depth
      * @throws IOException
      */
-    Long getQueueDepth() throws IOException;
+    Long getMaximumQueueDepth() throws IOException;
 
     /**
      * Sets the maximum size of all the messages together, that can be stored
@@ -170,9 +164,8 @@
      * @param value
      * @throws IOException
      */
-    @MBeanAttribute(name="QueueDepth",
-                         description="The size(KB) of all the messages together, that can be stored in the queue")
-    void setQueueDepth(Long value) throws IOException;
+    @MBeanAttribute(name="MaximumQueueDepth", description="The threshold high value(KB) for Queue Depth")
+    void setMaximumQueueDepth(Long value) throws IOException;
 
 
 
@@ -189,19 +182,22 @@
      * @throws JMException
      */
     @MBeanOperation(name="viewMessages",
-                         description="shows messages in this queue with given indexes. eg. from index 1 - 100")
+                    description="Message headers for messages in this queue within given index range. eg. from index 1 - 100")
     TabularData viewMessages(@MBeanOperationParameter(name="from index", description="from index")int fromIndex,
                              @MBeanOperationParameter(name="to index", description="to index")int toIndex)
             throws IOException, JMException, AMQException;
 
+    @MBeanOperation(name="viewMessageContent", description="The message content for given Message Id")
+    CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
+        throws IOException, JMException;
+
     /**
      * Deletes the first message from top.
      * @throws IOException
      * @throws JMException
      */
-    @MBeanOperation(name="deleteMessageFromTop",
-                         description="Deletes the first message from top",
-                         impact= MBeanOperationInfo.ACTION)
+    @MBeanOperation(name="deleteMessageFromTop", description="Deletes the first message from top",
+                    impact= MBeanOperationInfo.ACTION)
     void deleteMessageFromTop() throws IOException, JMException;
 
     /**
@@ -210,12 +206,8 @@
      * @throws JMException
      */
     @MBeanOperation(name="clearQueue",
-                         description="Clears the queue by deleting all the undelivered messages from the queue",
-                         impact= MBeanOperationInfo.ACTION)
+                    description="Clears the queue by deleting all the undelivered messages from the queue",
+                    impact= MBeanOperationInfo.ACTION)
     void clearQueue() throws IOException, JMException;
 
-    @MBeanOperation(name="viewMessageContent",
-                         description="Returns the message content along with MimeType and Encoding")
-    CompositeData viewMessageContent(@MBeanOperationParameter(name="Message Id", description="Message Id")long messageId)
-            throws IOException, JMException, AMQException;
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/security/auth/amqplain/AmqPlainSaslServer.java Mon Dec 18 01:10:59 2006
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.mina.common.ByteBuffer;
 
 import javax.security.sasl.SaslServer;
@@ -54,7 +55,7 @@
     {
         try
         {
-            final FieldTable ft = new FieldTable(ByteBuffer.wrap(response), response.length);
+            final FieldTable ft = FieldTableFactory.newFieldTable(ByteBuffer.wrap(response), response.length);
             String username = (String) ft.get("LOGIN");
             // we do not care about the prompt but it throws if null
             NameCallback nameCb = new NameCallback("prompt", username);

Modified: incubator/qpid/branches/new_persistence/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/pom.xml?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/pom.xml (original)
+++ incubator/qpid/branches/new_persistence/java/client/pom.xml Mon Dec 18 01:10:59 2006
@@ -38,7 +38,6 @@
         <amqj.logging.level>warn</amqj.logging.level>
     </properties>
 
-
     <dependencies>
         <dependency>
             <groupId>org.apache.qpid</groupId>
@@ -49,7 +48,6 @@
             <artifactId>qpid-broker</artifactId>
         </dependency>
 
-
         <dependency>
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
@@ -72,11 +70,11 @@
             <artifactId>mina-filter-ssl</artifactId>
         </dependency>
 
-
         <dependency>
             <groupId>jmscts</groupId>
             <artifactId>jmscts</artifactId>
             <version>0.5-b2</version>
+            <scope>test</scope>
             <exclusions>
                 <exclusion>
                     <groupId>jms</groupId>
@@ -86,9 +84,12 @@
         </dependency>
 
         <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.easymock</groupId>
             <artifactId>easymockclassextension</artifactId>
-            <scope>test</scope>
         </dependency>
 
     </dependencies>
@@ -106,7 +107,7 @@
                         </property>
                         <property>
                             <name>amqj.logging.level</name>
-                            <value>WARN</value>
+                            <value>${amqj.logging.level}</value>
                         </property>
                         <property>
                             <name>log4j.configuration</name>

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java Mon Dec 18 01:10:59 2006
@@ -56,7 +56,7 @@
             {
                 //todo this list of valid transports should be enumerated somewhere
                 if ((!(transport.equalsIgnoreCase("vm") ||
-                        transport.equalsIgnoreCase("tcp"))))
+                       transport.equalsIgnoreCase("tcp"))))
                 {
                     if (transport.equalsIgnoreCase("localhost"))
                     {
@@ -65,7 +65,7 @@
                     }
                     else
                     {
-                        if (url.charAt(transport.length()) == ':' && url.charAt(transport.length()+1) != '/' )
+                        if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/')
                         {
                             //Then most likely we have a host:port value
                             connection = new URI(DEFAULT_TRANSPORT + "://" + url);
@@ -88,7 +88,7 @@
             if (transport == null)
             {
                 URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" +
-                        " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
+                                         " In broker URL:'" + url + "' Format: " + URL_FORMAT_EXAMPLE, "");
             }
 
             setTransport(transport);
@@ -107,12 +107,45 @@
 
             if (port == -1)
             {
-                // Another fix for Java 1.5 URI handling
+                // Fix for when there is port data but it is not automatically parseable by getPort().
                 String auth = connection.getAuthority();
 
-                if (auth != null && auth.startsWith(":"))
+                if (auth != null && auth.contains(":"))
                 {
-                    setPort(Integer.parseInt(auth.substring(1)));
+                    int start = auth.indexOf(":") + 1;
+                    int end = start;
+                    boolean looking = true;
+                    boolean found = false;
+                    //Walk the authority looking for a port value.
+                    while (looking)
+                    {
+                        try
+                        {
+                            end++;
+                            Integer.parseInt(auth.substring(start, end));
+
+                            if (end >= auth.length())
+                            {
+                                looking = false;
+                                found = true;
+                            }
+                        }
+                        catch (NumberFormatException nfe)
+                        {
+                            looking = false;
+                        }
+
+                    }
+                    if (found)
+                    {
+                        setPort(Integer.parseInt(auth.substring(start, end)));
+                    }
+                    else
+                    {
+                        URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+                                             "Illegal character in port number", connection.toString());
+                    }
+
                 }
                 else
                 {
@@ -134,7 +167,7 @@
         {
             if (uris instanceof URLSyntaxException)
             {
-                throw (URLSyntaxException) uris;
+                throw(URLSyntaxException) uris;
             }
 
             URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
@@ -245,9 +278,9 @@
         BrokerDetails bd = (BrokerDetails) o;
 
         return _host.equalsIgnoreCase(bd.getHost()) &&
-                (_port == bd.getPort()) &&
-                _transport.equalsIgnoreCase(bd.getTransport()) &&
-                (useSSL() == bd.useSSL());
+               (_port == bd.getPort()) &&
+               _transport.equalsIgnoreCase(bd.getTransport()) &&
+               (useSSL() == bd.useSSL());
 
         //todo do we need to compare all the options as well?
     }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Dec 18 01:10:59 2006
@@ -44,6 +44,7 @@
 import org.apache.qpid.url.URLSyntaxException;
 
 import javax.jms.*;
+import javax.jms.IllegalStateException;
 import javax.naming.NamingException;
 import javax.naming.Reference;
 import javax.naming.Referenceable;
@@ -92,7 +93,7 @@
     /**
      * Maps from session id (Integer) to AMQSession instance
      */
-    private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap
+    private final Map _sessions = new LinkedHashMap(); //fixme this is map is replicated in amqprotocolsession as _channelId2SessionMap    
 
     private String _clientName;
 
@@ -142,7 +143,8 @@
                          String clientName, String virtualHost) throws AMQException, URLSyntaxException
     {
         this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
-                                  username + ":" + password + "@" + clientName +
+                                  username + ":" + password + "@" +
+                                  (clientName==null?"":clientName) +
                                   virtualHost + "?brokerlist='" + broker + "'"));
     }
 
@@ -157,11 +159,13 @@
     {
         this(new AMQConnectionURL(useSSL ?
                                   ConnectionURL.AMQ_PROTOCOL + "://" +
-                                  username + ":" + password + "@" + clientName +
+                                  username + ":" + password + "@" +
+                                  (clientName==null?"":clientName) +
                                   virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
                                   + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
                                                                                 ConnectionURL.AMQ_PROTOCOL + "://" +
-                                                                                username + ":" + password + "@" + clientName +
+                                                                                username + ":" + password + "@" +
+                                                                                (clientName==null?"":clientName) +
                                                                                 virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
                                                                                 + "," + ConnectionURL.OPTIONS_SSL + "='false'"
         ));
@@ -537,7 +541,10 @@
     public void setClientID(String clientID) throws JMSException
     {
         checkNotClosed();
-        _clientName = clientID;
+        // in AMQP it is not possible to change the client ID. If one is not specified
+        // upon connection construction, an id is generated automatically. Therefore
+        // we can always throw an exception.
+        throw new IllegalStateException("Client name cannot be changed after being set");
     }
 
     public ConnectionMetaData getMetaData() throws JMSException
@@ -583,7 +590,6 @@
     public void stop() throws JMSException
     {
         checkNotClosed();
-
         if (_started)
         {
             for (Iterator i = _sessions.values().iterator(); i.hasNext();)
@@ -920,8 +926,8 @@
     void deregisterSession(int channelId)
     {
         _sessions.remove(channelId);
-    }
-
+    }    
+    
     /**
      * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
      * The caller must hold the failover mutex before calling this method.

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Dec 18 01:10:59 2006
@@ -27,6 +27,7 @@
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
+import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.framing.*;
@@ -367,13 +368,24 @@
 
     public StreamMessage createStreamMessage() throws JMSException
     {
-        checkNotClosed();
-        throw new UnsupportedOperationException("Stream messages not supported");
+        synchronized (_connection.getFailoverMutex())
+        {
+            checkNotClosed();
+
+            try
+            {
+                return (StreamMessage) _messageFactoryRegistry.createMessage(JMSStreamMessage.MIME_TYPE);
+            }
+            catch (AMQException e)
+            {
+                throw new JMSException("Unable to create text message: " + e);
+            }
+        }
     }
 
     public TextMessage createTextMessage() throws JMSException
     {
-        synchronized(_connection.getFailoverMutex())
+        synchronized (_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -462,28 +474,30 @@
         // that can be called from a different thread of control from the one controlling the session
         synchronized(_connection.getFailoverMutex())
         {
-            _closed.set(true);
-
-            // we pass null since this is not an error case
-            closeProducersAndConsumers(null);
-
-            try
+            //Ensure we only try and close an open session.
+            if (!_closed.getAndSet(true))
             {
-                _connection.getProtocolHandler().closeSession(this);
-                final AMQFrame frame = ChannelCloseBody.createAMQFrame(
-                        getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
-                _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
-                // When control resumes at this point, a reply will have been received that
-                // indicates the broker has closed the channel successfully
+                // we pass null since this is not an error case
+                closeProducersAndConsumers(null);
 
-            }
-            catch (AMQException e)
-            {
-                throw new JMSException("Error closing session: " + e);
-            }
-            finally
-            {
-                _connection.deregisterSession(_channelId);
+                try
+                {
+                    _connection.getProtocolHandler().closeSession(this);
+                    final AMQFrame frame = ChannelCloseBody.createAMQFrame(
+                            getChannelId(), AMQConstant.REPLY_SUCCESS.getCode(), "JMS client closing channel", 0, 0);
+                    _connection.getProtocolHandler().syncWrite(frame, ChannelCloseOkBody.class);
+                    // When control resumes at this point, a reply will have been received that
+                    // indicates the broker has closed the channel successfully
+
+                }
+                catch (AMQException e)
+                {
+                    throw new JMSException("Error closing session: " + e);
+                }
+                finally
+                {
+                    _connection.deregisterSession(_channelId);
+                }
             }
         }
     }
@@ -723,6 +737,7 @@
 
     /**
      * Creates a QueueReceiver
+     *
      * @param destination
      * @return QueueReceiver - a wrapper around our MessageConsumer
      * @throws JMSException
@@ -736,6 +751,7 @@
 
     /**
      * Creates a QueueReceiver using a message selector
+     *
      * @param destination
      * @param messageSelector
      * @return QueueReceiver - a wrapper around our MessageConsumer
@@ -826,7 +842,7 @@
 
                 final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
                 // TODO: construct the rawSelector from the selector string if rawSelector == null
-                final FieldTable ft = new FieldTable();
+                final FieldTable ft = FieldTableFactory.newFieldTable();
                 //if (rawSelector != null)
                 //    ft.put("headers", rawSelector.getDataAsBytes());
                 if (rawSelector != null)
@@ -935,6 +951,7 @@
 
     public Queue createQueue(String queueName) throws JMSException
     {
+    	checkNotClosed();
         if (queueName.indexOf('/') == -1)
         {
             return new AMQQueue(queueName);
@@ -957,12 +974,14 @@
 
     /**
      * Creates a QueueReceiver wrapping a MessageConsumer
+     *
      * @param queue
      * @return QueueReceiver
      * @throws JMSException
      */
     public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
+    	checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest);
         return new QueueReceiverAdaptor(dest, consumer);
@@ -970,6 +989,7 @@
 
     /**
      * Creates a QueueReceiver wrapping a MessageConsumer using a message selector
+     *
      * @param queue
      * @param messageSelector
      * @return QueueReceiver
@@ -977,6 +997,7 @@
      */
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
     {
+    	checkNotClosed();
         AMQQueue dest = (AMQQueue) queue;
         BasicMessageConsumer consumer = (BasicMessageConsumer)
                 createConsumer(dest, messageSelector);
@@ -985,11 +1006,15 @@
 
     public QueueSender createSender(Queue queue) throws JMSException
     {
-        return (QueueSender) createProducer(queue);
+    	checkNotClosed();
+        //return (QueueSender) createProducer(queue);
+        return new QueueSenderAdapter(createProducer(queue), queue);
     }
 
     public Topic createTopic(String topicName) throws JMSException
     {
+    	checkNotClosed();
+    	
         if (topicName.indexOf('/') == -1)
         {
             return new AMQTopic(topicName);
@@ -1012,18 +1037,21 @@
 
     /**
      * Creates a non-durable subscriber
+     *
      * @param topic
      * @return TopicSubscriber - a wrapper round our MessageConsumer
      * @throws JMSException
      */
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
+    	checkNotClosed();
         AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
     }
 
     /**
      * Creates a non-durable subscriber with a message selector
+     *
      * @param topic
      * @param messageSelector
      * @param noLocal
@@ -1032,6 +1060,7 @@
      */
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
     {
+    	checkNotClosed();
         AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal));
     }
@@ -1045,6 +1074,7 @@
      */
     public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
     {
+    	checkNotClosed();
         AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
         return new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
     }
@@ -1055,6 +1085,7 @@
     public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
             throws JMSException
     {
+    	checkNotClosed();
         AMQTopic dest = new AMQTopic((AMQTopic) topic, _connection.getClientID(), name);
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         return new TopicSubscriberAdaptor(dest, consumer);
@@ -1062,26 +1093,32 @@
 
     public TopicPublisher createPublisher(Topic topic) throws JMSException
     {
-        return (TopicPublisher) createProducer(topic);
+    	checkNotClosed();
+        //return (TopicPublisher) createProducer(topic);
+        return new TopicPublisherAdapter(createProducer(topic), topic);
     }
 
     public QueueBrowser createBrowser(Queue queue) throws JMSException
     {
+    	checkNotClosed();
         throw new UnsupportedOperationException("Queue browsing not supported");
     }
 
     public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
     {
+    	checkNotClosed();
         throw new UnsupportedOperationException("Queue browsing not supported");
     }
 
     public TemporaryQueue createTemporaryQueue() throws JMSException
     {
+    	checkNotClosed();
         return new AMQTemporaryQueue();
     }
 
     public TemporaryTopic createTemporaryTopic() throws JMSException
     {
+    	checkNotClosed();
         return new AMQTemporaryTopic();
     }
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Mon Dec 18 01:10:59 2006
@@ -159,11 +159,13 @@
 
     public String getMessageSelector() throws JMSException
     {
+    	checkPreConditions();
         return _messageSelector;
     }
 
     public MessageListener getMessageListener() throws JMSException
     {
+    	checkPreConditions();
         return (MessageListener) _messageListener.get();
     }
 
@@ -179,7 +181,7 @@
 
     public void setMessageListener(MessageListener messageListener) throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
 
         //if the current listener is non-null and the session is not stopped, then
         //it is an error to call this method.
@@ -277,7 +279,7 @@
 
     public Message receive(long l) throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
 
         acquireReceiving();
 
@@ -311,7 +313,7 @@
 
     public Message receiveNoWait() throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
 
         acquireReceiving();
 
@@ -520,7 +522,7 @@
      */
     private void deregisterConsumer()
     {
-        _session.deregisterConsumer(_consumerTag);
+    	_session.deregisterConsumer(_consumerTag);
     }
 
     public String getConsumerTag()
@@ -529,7 +531,20 @@
     }
 
     public void setConsumerTag(String consumerTag)
-    {
+    {    	
         _consumerTag = consumerTag;
     }
+
+	public AMQSession getSession() {
+		return _session;
+	}
+	
+	private void checkPreConditions() throws JMSException{
+    	
+		this.checkNotClosed();
+		
+		if(_session == null || _session.isClosed()){
+			throw new UnsupportedOperationException("Invalid Session");
+		}
+	}
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Mon Dec 18 01:10:59 2006
@@ -143,6 +143,7 @@
 
     public void setDisableMessageID(boolean b) throws JMSException
     {
+    	checkPreConditions();
         checkNotClosed();
         // IGNORED
     }
@@ -156,7 +157,7 @@
 
     public void setDisableMessageTimestamp(boolean b) throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         _disableTimestamps = b;
     }
 
@@ -168,7 +169,7 @@
 
     public void setDeliveryMode(int i) throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
         {
             throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
@@ -185,7 +186,7 @@
 
     public void setPriority(int i) throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         if (i < 0 || i > 9)
         {
             throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -201,7 +202,7 @@
 
     public void setTimeToLive(long l) throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         if (l < 0)
         {
             throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -229,6 +230,7 @@
 
     public void send(Message message) throws JMSException
     {
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
@@ -238,6 +240,7 @@
 
     public void send(Message message, int deliveryMode) throws JMSException
     {
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -247,6 +250,7 @@
 
     public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
     {
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
@@ -257,6 +261,7 @@
     public void send(Message message, int deliveryMode, int priority,
                      long timeToLive) throws JMSException
     {
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
@@ -266,7 +271,7 @@
 
     public void send(Destination destination, Message message) throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -279,7 +284,7 @@
                      int priority, long timeToLive)
             throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -292,7 +297,7 @@
                      int priority, long timeToLive, boolean mandatory)
             throws JMSException
     {
-        checkNotClosed();
+        checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -305,7 +310,7 @@
                      int priority, long timeToLive, boolean mandatory, boolean immediate)
             throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -319,7 +324,7 @@
                      boolean immediate, boolean waitUntilSent)
             throws JMSException
     {
-        checkNotClosed();
+    	checkPreConditions();
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -334,7 +339,7 @@
         {
             throw new JMSException("Unsupported destination class: " +
                                    (destination != null ? destination.getClass() : null));
-        }
+        }        
         declareDestination((AMQDestination)destination);
     }
 
@@ -481,4 +486,20 @@
         checkNotClosed();
         _encoding = encoding;
     }
+    
+	private void checkPreConditions() throws IllegalStateException, JMSException {
+		checkNotClosed();
+		
+		if(_destination == null){
+			throw new UnsupportedOperationException("Destination is null");
+		}
+		
+		if(_session == null || _session.isClosed()){
+			throw new UnsupportedOperationException("Invalid Session");
+		}
+	}
+
+	public AMQSession getSession() {
+		return _session;
+	}
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueReceiverAdaptor.java Mon Dec 18 01:10:59 2006
@@ -39,31 +39,37 @@
 
     public String getMessageSelector() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.getMessageSelector();
     }
 
     public MessageListener getMessageListener() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.getMessageListener();
     }
 
     public void setMessageListener(MessageListener messageListener) throws JMSException
     {
+    	checkPreConditions();
        _consumer.setMessageListener(messageListener);
     }
 
     public Message receive() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.receive();
     }
 
     public Message receive(long l) throws JMSException
     {
+    	checkPreConditions();
         return _consumer.receive(l);
     }
 
     public Message receiveNoWait() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.receiveNoWait();
     }
 
@@ -79,8 +85,26 @@
      */
     public Queue getQueue() throws JMSException
     {
+    	checkPreConditions();
        return _queue;
     }
 
+    private void checkPreConditions() throws javax.jms.IllegalStateException {
+    	BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+    	
+    	if (msgConsumer.isClosed() ){
+			throw new javax.jms.IllegalStateException("Consumer is closed");
+		}
+		
+		if(_queue == null){
+			throw new UnsupportedOperationException("Queue is null");
+		}
+		
+		AMQSession session = msgConsumer.getSession();
+		
+		if(session == null || session.isClosed()){
+			throw new UnsupportedOperationException("Invalid Session");
+		}
+	}
 
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Mon Dec 18 01:10:59 2006
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client;
 
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -43,37 +44,45 @@
         _consumer = consumer;
         _noLocal = noLocal;
     }
+    
     TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer)
     {
         this(topic, consumer, consumer.isNoLocal());
     }
+    
     public Topic getTopic() throws JMSException
     {
+    	checkPreConditions();
         return _topic;
     }
 
     public boolean getNoLocal() throws JMSException
     {
+    	checkPreConditions();
         return _noLocal;
     }
 
     public String getMessageSelector() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.getMessageSelector();
     }
 
     public MessageListener getMessageListener() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.getMessageListener();
     }
 
     public void setMessageListener(MessageListener messageListener) throws JMSException
     {
+    	checkPreConditions();
         _consumer.setMessageListener(messageListener);
     }
 
     public Message receive() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.receive();
     }
 
@@ -84,6 +93,7 @@
 
     public Message receiveNoWait() throws JMSException
     {
+    	checkPreConditions();
         return _consumer.receiveNoWait();
     }
 
@@ -91,4 +101,22 @@
     {
         _consumer.close();
     }
+    
+    private void checkPreConditions() throws javax.jms.IllegalStateException{
+    	BasicMessageConsumer msgConsumer = (BasicMessageConsumer)_consumer;
+    	
+    	if (msgConsumer.isClosed() ){
+			throw new javax.jms.IllegalStateException("Consumer is closed");
+		}
+		
+		if(_topic == null){
+			throw new UnsupportedOperationException("Topic is null");
+		}
+		
+		AMQSession session = msgConsumer.getSession();
+		
+		if(session == null || session.isClosed()){
+			throw new UnsupportedOperationException("Invalid Session");
+		}
+	}
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Mon Dec 18 01:10:59 2006
@@ -32,6 +32,7 @@
 import org.apache.qpid.framing.ConnectionStartBody;
 import org.apache.qpid.framing.ConnectionStartOkBody;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
@@ -117,7 +118,7 @@
             }
 
             stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-            FieldTable clientProperties = new FieldTable();
+            FieldTable clientProperties = FieldTableFactory.newFieldTable();
             clientProperties.put("instance", ps.getClientID());
             clientProperties.put("product", "Qpid");
             clientProperties.put("version", "1.0");

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=488163&r1=488162&r2=488163
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Mon Dec 18 01:10:59 2006
@@ -27,6 +27,7 @@
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
+import javax.jms.MessageEOFException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;