You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/09/04 21:21:45 UTC

svn commit: r1165093 - /qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/

Author: rgodfrey
Date: Sun Sep  4 19:21:45 2011
New Revision: 1165093

URL: http://svn.apache.org/viewvc?rev=1165093&view=rev
Log:
NO-JIRA: JMS Fixes

Modified:
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Sun Sep  4 19:21:45 2011
@@ -24,10 +24,14 @@ import org.apache.qpid.amqp_1_0.jms.Conn
 import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
 
 import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
 import java.net.MalformedURLException;
 import java.net.URL;
 
-public class ConnectionFactoryImpl implements ConnectionFactory
+public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory
 {
     private String _host;
     private int _port;
@@ -48,12 +52,12 @@ public class ConnectionFactoryImpl imple
         _clientId = clientId;
     }
 
-    public Connection createConnection() throws JMSException
+    public ConnectionImpl createConnection() throws JMSException
     {
         return new ConnectionImpl(_host, _port, _username, _password, _clientId);
     }
 
-    public Connection createConnection(final String username, final String password) throws JMSException
+    public ConnectionImpl createConnection(final String username, final String password) throws JMSException
     {
         return new ConnectionImpl(_host, _port, username, password, _clientId);
     }
@@ -97,4 +101,24 @@ public class ConnectionFactoryImpl imple
         return new ConnectionFactoryImpl(host, port, username, password, clientId);
 
     }
+
+    public QueueConnection createQueueConnection() throws JMSException
+    {
+        return createConnection();
+    }
+
+    public QueueConnection createQueueConnection(final String username, final String password) throws JMSException
+    {
+        return createConnection(username, password);
+    }
+
+    public TopicConnection createTopicConnection() throws JMSException
+    {
+        return createConnection();
+    }
+
+    public TopicConnection createTopicConnection(final String username, final String password) throws JMSException
+    {
+        return createConnection(username, password);
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Sun Sep  4 19:21:45 2011
@@ -22,16 +22,12 @@ import org.apache.qpid.amqp_1_0.jms.Conn
 import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
 import org.apache.qpid.amqp_1_0.jms.Session;
 
-import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 import java.util.ArrayList;
 import java.util.List;
 
-public class ConnectionImpl implements Connection
+public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
 {
 
     private ConnectionMetaData _connectionMetaData;
@@ -97,7 +93,7 @@ public class ConnectionImpl implements C
         {
             if(_state == State.CLOSED)
             {
-                throw new JMSException("Cannot create a session on a closed connection");
+                throw new IllegalStateException("Cannot create a session on a closed connection");
             }
 
             SessionImpl session = new SessionImpl(this, acknowledgeMode);
@@ -111,6 +107,7 @@ public class ConnectionImpl implements C
 
     public String getClientID() throws JMSException
     {
+        checkClosed();
         return null;  //TODO
     }
 
@@ -123,16 +120,19 @@ public class ConnectionImpl implements C
 
     public ConnectionMetaData getMetaData() throws JMSException
     {
+        checkClosed();
         return _connectionMetaData;
     }
 
     public ExceptionListener getExceptionListener() throws JMSException
     {
+        checkClosed();
         return _exceptionListener;
     }
 
     public void setExceptionListener(final ExceptionListener exceptionListener) throws JMSException
     {
+        checkClosed();
         _exceptionListener = exceptionListener;
     }
 
@@ -140,7 +140,7 @@ public class ConnectionImpl implements C
     {
         synchronized(_lock)
         {
-
+            checkClosed();
             if(_state == State.STOPPED)
             {
                 // TODO
@@ -153,6 +153,7 @@ public class ConnectionImpl implements C
                 }
 
             }
+
             _lock.notifyAll();
         }
 
@@ -172,7 +173,7 @@ public class ConnectionImpl implements C
                     _state = State.STOPPED;
                     break;
                 case CLOSED:
-                    //TODO
+                    throw new javax.jms.IllegalStateException("Closed");
             }
 
             _lock.notifyAll();
@@ -198,11 +199,33 @@ public class ConnectionImpl implements C
         }
     }
 
+    private void checkClosed() throws IllegalStateException
+    {
+        if(_state == State.CLOSED)
+            throw new IllegalStateException("Closed");
+    }
+
     public ConnectionConsumer createConnectionConsumer(final Destination destination,
                                                        final String s,
                                                        final ServerSessionPool serverSessionPool,
                                                        final int i) throws JMSException
     {
+        checkClosed();
+        return null;  //TODO
+    }
+
+    public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+    {
+        checkClosed();
+        return createSession(transacted, acknowledgeMode);
+    }
+
+    public ConnectionConsumer createConnectionConsumer(final Topic topic,
+                                                       final String s,
+                                                       final ServerSessionPool serverSessionPool,
+                                                       final int i) throws JMSException
+    {
+        checkClosed();
         return null;  //TODO
     }
 
@@ -212,9 +235,27 @@ public class ConnectionImpl implements C
                                                               final ServerSessionPool serverSessionPool,
                                                               final int i) throws JMSException
     {
+        checkClosed();
         return null;  //TODO
     }
 
+    public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException
+    {
+        checkClosed();
+        return createSession(transacted, acknowledgeMode);
+    }
+
+    public ConnectionConsumer createConnectionConsumer(final Queue queue,
+                                                       final String s,
+                                                       final ServerSessionPool serverSessionPool,
+                                                       final int i) throws JMSException
+    {
+        checkClosed();
+        return null;  //TODO
+    }
+
+
+
     protected org.apache.qpid.amqp_1_0.client.Connection getClientConnection()
     {
         return _conn;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionMetaDataImpl.java Sun Sep  4 19:21:45 2011
@@ -21,6 +21,9 @@ package org.apache.qpid.amqp_1_0.jms.imp
 import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
 
 import javax.jms.JMSException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Enumeration;
 
 public class ConnectionMetaDataImpl implements ConnectionMetaData
@@ -35,6 +38,7 @@ public class ConnectionMetaDataImpl impl
     private final int _amqpMajorVersion;
     private final int _amqpMinorVersion;
     private final int _amqpRevisionVersion;
+    private static final Collection<String> _jmsxProperties = Arrays.asList("JMSXGroupID", "JMSXGroupSeq");
 
     public ConnectionMetaDataImpl(final int amqpMajorVersion, final int amqpMinorVersion, final int amqpRevisionVersion)
     {
@@ -80,7 +84,8 @@ public class ConnectionMetaDataImpl impl
 
     public Enumeration getJMSXPropertyNames() throws JMSException
     {
-        return null;  //TODO
+
+        return Collections.enumeration(_jmsxProperties);
     }
 
     public int getAMQPMajorVersion()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Sun Sep  4 19:21:45 2011
@@ -21,15 +21,17 @@ package org.apache.qpid.amqp_1_0.jms.imp
 import org.apache.qpid.amqp_1_0.client.Message;
 import org.apache.qpid.amqp_1_0.client.Receiver;
 import org.apache.qpid.amqp_1_0.jms.MessageConsumer;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Session;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 
-public class MessageConsumerImpl implements MessageConsumer
+public class MessageConsumerImpl implements MessageConsumer, QueueReceiver, TopicSubscriber
 {
     private String _selector;
     private boolean _noLocal;
@@ -37,7 +39,9 @@ public class MessageConsumerImpl impleme
     private SessionImpl _session;
     private Receiver _receiver;
     private Binary _lastUnackedMessage;
-    private MessageListener _messageListener;
+    MessageListener _messageListener;
+
+    private boolean _closed = false;
 
     MessageConsumerImpl(final Destination destination,
                         final SessionImpl session,
@@ -67,16 +71,19 @@ public class MessageConsumerImpl impleme
 
     public String getMessageSelector() throws JMSException
     {
+        checkClosed();
         return _selector;
     }
 
-    public MessageListener getMessageListener()
+    public MessageListener getMessageListener() throws IllegalStateException
     {
+        checkClosed();
         return _messageListener;
     }
 
     public void setMessageListener(final MessageListener messageListener) throws JMSException
     {
+        checkClosed();
         _messageListener = messageListener;
         _session.messageListenerSet( this );
         _receiver.setMessageArrivalListener(new Receiver.MessageArrivalListener()
@@ -91,11 +98,13 @@ public class MessageConsumerImpl impleme
 
     public MessageImpl receive() throws JMSException
     {
+        checkClosed();
         return receiveImpl(-1L);
     }
 
     public MessageImpl receive(final long timeout) throws JMSException
     {
+        checkClosed();
         // TODO - validate timeout > 0
 
         return receiveImpl(timeout);
@@ -103,10 +112,11 @@ public class MessageConsumerImpl impleme
 
     public MessageImpl receiveNoWait() throws JMSException
     {
+        checkClosed();
         return receiveImpl(0L);
     }
 
-    private MessageImpl receiveImpl(long timeout)
+    private MessageImpl receiveImpl(long timeout) throws IllegalStateException
     {
         org.apache.qpid.amqp_1_0.client.Message msg = receive0(timeout);
         if(msg != null)
@@ -142,7 +152,21 @@ public class MessageConsumerImpl impleme
 
     public void close() throws JMSException
     {
-        //TODO
+        if(!_closed)
+        {
+            _closed = true;
+
+            _receiver.close();
+
+        }
+    }
+
+    private void checkClosed() throws IllegalStateException
+    {
+        if(_closed)
+        {
+            throw new javax.jms.IllegalStateException("Closed");
+        }
     }
 
     void setLastUnackedMessage(final Binary deliveryTag)
@@ -150,7 +174,7 @@ public class MessageConsumerImpl impleme
         _lastUnackedMessage = deliveryTag;
     }
 
-    void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg)
+    void preReceiveAction(final org.apache.qpid.amqp_1_0.client.Message msg) throws IllegalStateException
     {
         final int acknowledgeMode = _session.getAcknowledgeMode();
 
@@ -173,19 +197,22 @@ public class MessageConsumerImpl impleme
         }
     }
 
-    public DestinationImpl getDestination()
+    public DestinationImpl getDestination() throws IllegalStateException
     {
+        checkClosed();
         return _destination;
     }
 
 
-    public SessionImpl getSession()
+    public SessionImpl getSession() throws IllegalStateException
     {
+        checkClosed();
         return _session;
     }
 
-    public boolean getNoLocal()
+    public boolean getNoLocal() throws IllegalStateException
     {
+        checkClosed();
         return _noLocal;
     }
 
@@ -193,4 +220,14 @@ public class MessageConsumerImpl impleme
     {
         _receiver.setCredit(UnsignedInteger.valueOf(100), true);
     }
+
+    public Queue getQueue() throws JMSException
+    {
+        return (Queue) getDestination();
+    }
+
+    public Topic getTopic() throws JMSException
+    {
+        return (Topic) getDestination();
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java Sun Sep  4 19:21:45 2011
@@ -20,10 +20,15 @@
 package org.apache.qpid.amqp_1_0.jms.impl;
 
 import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
 import java.util.*;
 
 class MessageFactory
@@ -95,7 +100,32 @@ class MessageFactory
             }
             else if(bodySection instanceof Data)
             {
-                message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
+                if(properties != null && ObjectMessageImpl.CONTENT_TYPE.equals(properties.getContentType()))
+                {
+
+                    Serializable serializable = null;
+                    Binary data = ((Data) bodySection).getValue();
+
+                    try
+                    {
+                        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data.getArray(), data.getArrayOffset(), data.getLength()));
+                        serializable = (Serializable) ois.readObject();
+                    }
+                    catch (IOException e)
+                    {
+                        e.printStackTrace();
+                    }
+                    catch (ClassNotFoundException e)
+                    {
+                        e.printStackTrace();  //TODO
+                    }
+
+                    message = new ObjectMessageImpl(header, properties, appProperties, serializable, footer, _session);
+                }
+                else
+                {
+                    message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
+                }
             }
             else if(bodySection instanceof AmqpSequence)
             {
@@ -152,6 +182,8 @@ class MessageFactory
             message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
         }
 
+        message.setReadOnly();
+
         return message;
     }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java Sun Sep  4 19:21:45 2011
@@ -46,12 +46,17 @@ import java.util.*;
 
 public abstract class MessageImpl implements Message
 {
+    static final Set<Class> _supportedClasses =
+                new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
+                                                 Float.class, Double.class, Character.class, String.class, byte[].class));
+
     private Header _header;
     private Properties _properties;
     private ApplicationProperties _applicationProperties;
     private Footer _footer;
     public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
     private SessionImpl _sessionImpl;
+    private boolean _readOnly;
 
     protected MessageImpl(Header header,
                           Properties properties,
@@ -133,13 +138,13 @@ public abstract class MessageImpl implem
 
     public void setJMSCorrelationID(String s) throws JMSException
     {
-        getProperties().setCorrelationId(new Binary(s.getBytes()));
+        getProperties().setCorrelationId(s == null ? null : new Binary(s.getBytes()));
     }
 
     public String getJMSCorrelationID() throws JMSException
     {
         final Binary id = (Binary) getProperties().getCorrelationId();
-        return new String(id.getArray(), id.getArrayOffset(), id.getLength());
+        return id == null ? null : new String(id.getArray(), id.getArrayOffset(), id.getLength());
     }
 
     public DestinationImpl getJMSReplyTo() throws JMSException
@@ -253,12 +258,24 @@ public abstract class MessageImpl implem
 
     public long getJMSExpiration() throws JMSException
     {
-        return 0;  //TODO
+        final UnsignedInteger ttl = getTtl();
+        return ttl == null || ttl.longValue() == 0 ? 0 : getJMSTimestamp() + ttl.longValue();
     }
 
     public void setJMSExpiration(long l) throws JMSException
     {
-        //TODO
+        if(l == 0)
+        {
+            setTtl(UnsignedInteger.ZERO);
+        }
+        else
+        {
+            if(getTransmitTime() == null)
+            {
+                setTransmitTime(new Date());
+            }
+            setTtl(UnsignedInteger.valueOf(l - getTransmitTime().getDate()));
+        }
     }
 
     public int getJMSPriority() throws JMSException
@@ -281,7 +298,7 @@ public abstract class MessageImpl implem
 
     public void clearProperties() throws JMSException
     {
-        //TODO
+        _applicationProperties.getValue().clear();
     }
 
     public boolean propertyExists(final String s) throws JMSException
@@ -674,53 +691,62 @@ public abstract class MessageImpl implem
 
     public void setBooleanProperty(final String s, final boolean b) throws JMSException
     {
+        checkWritable();
         setBooleanProperty((Object)s, b);
     }
 
     public void setByteProperty(final String s, final byte b) throws JMSException
     {
+        checkWritable();
         setByteProperty((Object)s, b);
     }
 
     public void setShortProperty(final String s, final short i) throws JMSException
     {
+        checkWritable();
         setShortProperty((Object)s, i);
     }
 
     public void setIntProperty(final String s, final int i) throws JMSException
     {
+        checkWritable();
         setIntProperty((Object)s, i);
     }
 
     public void setLongProperty(final String s, final long l) throws JMSException
     {
+        checkWritable();
         setLongProperty((Object)s, l);
     }
 
     public void setFloatProperty(final String s, final float v) throws JMSException
     {
+        checkWritable();
         setFloatProperty((Object) s, v);
     }
 
     public void setDoubleProperty(final String s, final double v) throws JMSException
     {
+        checkWritable();
         setDoubleProperty((Object)s, v);
     }
 
     public void setStringProperty(final String s, final String s1) throws JMSException
     {
+        checkWritable();
         setStringProperty((Object)s, s1);
     }
 
     public void setObjectProperty(final String s, final Object o) throws JMSException
     {
-        if(o != null && (o.getClass().isPrimitive() || o instanceof String))
+        checkWritable();
+        if(o != null && (_supportedClasses.contains(o.getClass())))
         {
             setObjectProperty((Object)s, o);
         }
         else
         {
-            throw new JMSException("Cannot call setObjectProperty with a value of " + ((o == null) ? "null" : " class "+o.getClass().getName()) + ".");
+            throw new MessageFormatException("Cannot call setObjectProperty with a value of " + ((o == null) ? "null" : " class "+o.getClass().getName()) + ".");
         }
     }
 
@@ -958,12 +984,12 @@ public abstract class MessageImpl implem
 
     public void clearBody() throws JMSException
     {
-        //TODO
+        _readOnly = false;
     }
 
     protected boolean isReadOnly()
     {
-        return false;  //TODO
+        return _readOnly;
     }
 
     protected void checkReadable() throws MessageNotReadableException
@@ -982,6 +1008,11 @@ public abstract class MessageImpl implem
         }
     }
 
+    public void setReadOnly()
+    {
+        _readOnly = true;
+    }
+
     private static class InvalidJMSMEssageIdException extends JMSException
     {
         public InvalidJMSMEssageIdException(String messageId)

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Sun Sep  4 19:21:45 2011
@@ -20,15 +20,17 @@ package org.apache.qpid.amqp_1_0.jms.imp
 
 import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 import java.util.UUID;
 
-public class MessageProducerImpl implements MessageProducer
+public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher
 {
     private boolean _disableMessageID;
     private boolean _disableMessageTimestamp;
@@ -39,6 +41,7 @@ public class MessageProducerImpl impleme
     private DestinationImpl _destination;
     private SessionImpl _session;
     private Sender _sender;
+    private boolean _closed;
 
     protected MessageProducerImpl(final Destination destination,
                                final SessionImpl session) throws JMSException
@@ -67,64 +70,97 @@ public class MessageProducerImpl impleme
         }
     }
 
-    public boolean getDisableMessageID()
+    private void checkClosed() throws IllegalStateException
     {
+        if(_closed)
+        {
+            throw new javax.jms.IllegalStateException("Producer closed");
+        }
+    }
+
+    public boolean getDisableMessageID() throws IllegalStateException
+    {
+        checkClosed();
         return _disableMessageID;
     }
 
-    public void setDisableMessageID(final boolean disableMessageID)
+    public void setDisableMessageID(final boolean disableMessageID) throws IllegalStateException
     {
+        checkClosed();
         _disableMessageID = disableMessageID;
     }
 
-    public boolean getDisableMessageTimestamp()
+    public boolean getDisableMessageTimestamp() throws IllegalStateException
     {
+        checkClosed();
         return _disableMessageTimestamp;
     }
 
-    public void setDisableMessageTimestamp(final boolean disableMessageTimestamp)
+    public void setDisableMessageTimestamp(final boolean disableMessageTimestamp) throws IllegalStateException
     {
+        checkClosed();
         _disableMessageTimestamp = disableMessageTimestamp;
     }
 
-    public int getDeliveryMode()
+    public int getDeliveryMode() throws IllegalStateException
     {
+        checkClosed();
         return _deliveryMode;
     }
 
-    public void setDeliveryMode(final int deliveryMode)
+    public void setDeliveryMode(final int deliveryMode) throws IllegalStateException
     {
+        checkClosed();
         _deliveryMode = deliveryMode;
     }
 
-    public int getPriority()
+    public int getPriority() throws IllegalStateException
     {
+        checkClosed();
         return _priority;
     }
 
-    public void setPriority(final int priority)
+    public void setPriority(final int priority) throws IllegalStateException
     {
+        checkClosed();
         _priority = priority;
     }
 
-    public long getTimeToLive()
+    public long getTimeToLive() throws IllegalStateException
     {
+        checkClosed();
         return _timeToLive;
     }
 
-    public void setTimeToLive(final long timeToLive)
+    public void setTimeToLive(final long timeToLive) throws IllegalStateException
     {
+        checkClosed();
         _timeToLive = timeToLive;
     }
 
     public DestinationImpl getDestination() throws JMSException
     {
+        checkClosed();
         return _destination;
     }
 
     public void close() throws JMSException
     {
-        //TODO
+        try
+        {
+            if(!_closed)
+            {
+                _closed = true;
+                _sender.close();
+            }
+
+        }
+        catch (Sender.SenderClosingException e)
+        {
+            final JMSException jmsException = new JMSException("error closing");
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
     }
 
     public void send(final Message message) throws JMSException
@@ -142,11 +178,22 @@ public class MessageProducerImpl impleme
         }
         else
         {
-            msg = convertMessage(message);
+            msg = _session.convertMessage(message);
         }
 
         msg.setJMSDeliveryMode(deliveryMode);
         msg.setJMSPriority(priority);
+
+        msg.setJMSDestination(_destination);
+
+        long timestamp = 0l;
+
+        if(!getDisableMessageTimestamp() && ttl==0)
+        {
+            timestamp = System.currentTimeMillis();
+            msg.setJMSTimestamp(timestamp);
+
+        }
         if(ttl != 0)
         {
             msg.setTtl(UnsignedInteger.valueOf(ttl));
@@ -155,30 +202,49 @@ public class MessageProducerImpl impleme
         {
             msg.setTtl(null);
         }
-        msg.setJMSDestination(_destination);
-        if(!getDisableMessageTimestamp())
+
+        if(!getDisableMessageID() && msg.getMessageId() == null)
         {
-            msg.setJMSTimestamp(System.currentTimeMillis());
+            final Binary messageId = generateMessageId();
+            msg.setMessageId(messageId);
+
         }
-        if(!getDisableMessageID() && msg.getMessageId() == null)
+
+        if(message != msg)
         {
-            msg.setMessageId(generateMessageId());
+            message.setJMSTimestamp(msg.getJMSTimestamp());
+            message.setJMSMessageID(msg.getJMSMessageID());
+            message.setJMSDeliveryMode(msg.getJMSDeliveryMode());
+            message.setJMSPriority(msg.getJMSPriority());
+            message.setJMSExpiration(msg.getJMSExpiration());
         }
 
+
         final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
 
         _sender.send(clientMessage);
+
+        if(getDestination() != null)
+        {
+            message.setJMSDestination(getDestination());
+        }
     }
 
-    private Binary generateMessageId()
+    public void send(final javax.jms.Queue queue, final Message message) throws JMSException
     {
-        UUID uuid = UUID.randomUUID();        
-        return new Binary(uuid.toString().getBytes());
+        send((Destination)queue, message);
+    }
+
+    public void send(final javax.jms.Queue queue, final Message message, final int deliveryMode, final int priority, final long ttl)
+            throws JMSException
+    {
+        send((Destination)queue, message, deliveryMode, priority, ttl);
     }
 
-    private MessageImpl convertMessage(final Message message)
+    private Binary generateMessageId()
     {
-        return null;  //TODO
+        UUID uuid = UUID.randomUUID();
+        return new Binary(uuid.toString().getBytes());
     }
 
     public void send(final Destination destination, final Message message) throws JMSException
@@ -191,4 +257,35 @@ public class MessageProducerImpl impleme
     {
         //TODO
     }
+
+    public Queue getQueue() throws JMSException
+    {
+        return (Queue) getDestination();
+    }
+
+    public Topic getTopic() throws JMSException
+    {
+        return (Topic) getDestination();
+    }
+
+    public void publish(final Message message) throws JMSException
+    {
+        send(message);
+    }
+
+    public void publish(final Message message, final int deliveryMode, final int priority, final long ttl) throws JMSException
+    {
+        send(message, deliveryMode, priority, ttl);
+    }
+
+    public void publish(final Topic topic, final Message message) throws JMSException
+    {
+        send(topic, message);
+    }
+
+    public void publish(final Topic topic, final Message message, final int deliveryMode, final int priority, final long ttl)
+            throws JMSException
+    {
+        send(topic, message, deliveryMode, priority, ttl);
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java Sun Sep  4 19:21:45 2011
@@ -20,26 +20,34 @@
 package org.apache.qpid.amqp_1_0.jms.impl;
 
 import org.apache.qpid.amqp_1_0.jms.ObjectMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
 
 import javax.jms.MessageNotWriteableException;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.*;
 
 public class ObjectMessageImpl extends MessageImpl implements ObjectMessage
 {
+    static final Symbol CONTENT_TYPE = Symbol.valueOf("application/x-java-serialized-object");
+
     private Serializable _object;
 
     protected ObjectMessageImpl(Header header,
                                 Properties properties,
-                                Footer footer,
                                 ApplicationProperties appProperties,
                                 Serializable object,
+                                Footer footer,
                                 SessionImpl session)
     {
         super(header, properties, appProperties, footer, session);
+        getProperties().setContentType(CONTENT_TYPE);
         _object = object;
     }
 
@@ -47,6 +55,7 @@ public class ObjectMessageImpl extends M
     {
         super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
               session);
+        getProperties().setContentType(CONTENT_TYPE);
     }
 
     public void setObject(final Serializable serializable) throws MessageNotWriteableException
@@ -67,8 +76,22 @@ public class ObjectMessageImpl extends M
         sections.add(getHeader());
         sections.add(getProperties());
         sections.add(getApplicationProperties());
-        AmqpValue section = new AmqpValue(_object);
-        sections.add(section);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try
+        {
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(_object);
+            oos.flush();
+            oos.close();
+            sections.add(new Data(new Binary(baos.toByteArray())));
+
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();  //TODO
+        }
+
         sections.add(getFooter());
         return sections;
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Sun Sep  4 19:21:45 2011
@@ -19,6 +19,7 @@
 package org.apache.qpid.amqp_1_0.jms.impl;
 
 import org.apache.qpid.amqp_1_0.client.Receiver;
+import org.apache.qpid.amqp_1_0.jms.Queue;
 import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
 
 import javax.jms.JMSException;
@@ -39,7 +40,7 @@ public class QueueReceiverImpl extends M
         return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
     }
 
-    public QueueImpl getQueue() throws JMSException
+    public Queue getQueue() throws JMSException
     {
         return (QueueImpl) getDestination();
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Sun Sep  4 19:21:45 2011
@@ -20,28 +20,35 @@ package org.apache.qpid.amqp_1_0.jms.imp
 
 import org.apache.qpid.amqp_1_0.client.Connection;
 import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.jms.QueueSender;
+import org.apache.qpid.amqp_1_0.jms.QueueSession;
 import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
+import org.apache.qpid.amqp_1_0.jms.TopicSession;
+import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.Topic;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
 
-public class SessionImpl implements Session
+public class SessionImpl implements Session, QueueSession, TopicSession
 {
     private ConnectionImpl _connection;
     private AcknowledgeMode _acknowledgeMode;
     private org.apache.qpid.amqp_1_0.client.Session _session;
     private MessageFactory _messageFactory;
     private List<MessageConsumerImpl> _consumers = new ArrayList<MessageConsumerImpl>();
+    private List<MessageProducerImpl> _producers = new ArrayList<MessageProducerImpl>();
+
     private MessageListener _messageListener;
     private Dispatcher _dispatcher = new Dispatcher();
     private Thread _dispatcherThread;
 
+    private boolean _closed;
 
     protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
     {
@@ -55,29 +62,33 @@ public class SessionImpl implements Sess
         _dispatcherThread.start();
     }
 
-    public BytesMessageImpl createBytesMessage() throws JMSException
+    public BytesMessageImpl createBytesMessage() throws IllegalStateException
     {
+        checkClosed();
         return new BytesMessageImpl(this);
 
     }
 
     public MapMessageImpl createMapMessage() throws JMSException
     {
+        checkClosed();
         return new MapMessageImpl(this);
     }
 
-    public MessageImpl createMessage() throws JMSException
+    public MessageImpl createMessage() throws IllegalStateException
     {
         return createAmqpMessage();
     }
 
     public ObjectMessageImpl createObjectMessage() throws JMSException
     {
+        checkClosed();
         return new ObjectMessageImpl(this);
     }
 
     public ObjectMessageImpl createObjectMessage(final Serializable serializable) throws JMSException
     {
+        checkClosed();
         ObjectMessageImpl msg = new ObjectMessageImpl(this);
         msg.setObject(serializable);
         return msg;
@@ -85,55 +96,81 @@ public class SessionImpl implements Sess
 
     public StreamMessageImpl createStreamMessage() throws JMSException
     {
+        checkClosed();
         return new StreamMessageImpl(this);
     }
 
     public TextMessageImpl createTextMessage() throws JMSException
     {
+        checkClosed();
         return new TextMessageImpl(this);
     }
 
     public TextMessageImpl createTextMessage(final String s) throws JMSException
     {
+        checkClosed();
         TextMessageImpl msg = new TextMessageImpl(this);
         msg.setText(s);
         return msg;
     }
 
-    public AmqpMessageImpl createAmqpMessage() throws JMSException
+    public AmqpMessageImpl createAmqpMessage() throws IllegalStateException
     {
+        checkClosed();
         return new AmqpMessageImpl(this);
     }
 
     public boolean getTransacted() throws JMSException
     {
+        checkClosed();
         return _acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED;
     }
 
-    public int getAcknowledgeMode()
+    public int getAcknowledgeMode() throws IllegalStateException
     {
+        checkClosed();
         return _acknowledgeMode.ordinal();
     }
 
     public void commit() throws JMSException
     {
+        checkClosed();
         //TODO
     }
 
     public void rollback() throws JMSException
     {
+        checkClosed();
         //TODO
     }
 
     public void close() throws JMSException
     {
-        _dispatcher.close();
-        _session.close();
+        if(!_closed)
+        {
+            _closed = true;
+            _dispatcher.close();
+            for(MessageConsumerImpl consumer : _consumers)
+            {
+                consumer.close();
+            }
+            for(MessageProducerImpl producer : _producers)
+            {
+                producer.close();
+            }
+            _session.close();
+        }
+    }
 
+    private void checkClosed() throws IllegalStateException
+    {
+        if(_closed)
+            throw new IllegalStateException("Closed");
     }
 
     public void recover() throws JMSException
     {
+        checkClosed();
         //TODO
     }
 
@@ -161,22 +198,31 @@ public class SessionImpl implements Sess
 
     public MessageProducerImpl createProducer(final Destination destination) throws JMSException
     {
-        return new MessageProducerImpl(destination, this);
+        checkClosed();
+
+        final MessageProducerImpl messageProducer = new MessageProducerImpl(destination, this);
+
+        _producers.add(messageProducer);
+
+        return messageProducer;
     }
 
     public MessageConsumerImpl createConsumer(final Destination destination) throws JMSException
     {
+        checkClosed();
         return createConsumer(destination, null, false);
     }
 
     public MessageConsumerImpl createConsumer(final Destination destination, final String selector) throws JMSException
     {
+        checkClosed();
         return createConsumer(destination, selector, false);
     }
 
     public MessageConsumerImpl createConsumer(final Destination destination, final String selector, final boolean noLocal)
             throws JMSException
     {
+        checkClosed();
         final MessageConsumerImpl messageConsumer;
         synchronized(_session.getEndpoint().getLock())
         {
@@ -197,47 +243,92 @@ public class SessionImpl implements Sess
 
     public QueueImpl createQueue(final String s) throws JMSException
     {
+        checkClosed();
         return new QueueImpl(s);
     }
 
+    public QueueReceiver createReceiver(final Queue queue) throws JMSException
+    {
+        checkClosed();
+        return createConsumer(queue);
+    }
+
+    public QueueReceiver createReceiver(final Queue queue, final String selector) throws JMSException
+    {
+        checkClosed();
+        return createConsumer(queue, selector);
+    }
+
+    public QueueSender createSender(final Queue queue) throws JMSException
+    {
+        checkClosed();
+        return createProducer(queue);
+    }
+
     public TopicImpl createTopic(final String s) throws JMSException
     {
+        checkClosed();
         return new TopicImpl(s);
     }
 
+    public TopicSubscriber createSubscriber(final Topic topic) throws JMSException
+    {
+        checkClosed();
+        return createConsumer(topic);
+    }
+
+    public TopicSubscriber createSubscriber(final Topic topic, final String selector, final boolean noLocal) throws JMSException
+    {
+        checkClosed();
+        return createConsumer(topic, selector, noLocal);
+    }
+
     public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name) throws JMSException
     {
+        checkClosed();
         return createDurableSubscriber(topic, name, null, false);
     }
 
     public TopicSubscriberImpl createDurableSubscriber(final Topic topic, final String name, final String selector, final boolean noLocal)
             throws JMSException
     {
+        checkClosed();
         return null;  //TODO
     }
 
+    public TopicPublisher createPublisher(final Topic topic) throws JMSException
+    {
+        checkClosed();
+        return createProducer(topic);
+    }
+
     public QueueBrowserImpl createBrowser(final Queue queue) throws JMSException
     {
+        checkClosed();
         return createBrowser(queue, null);
     }
 
     public QueueBrowserImpl createBrowser(final Queue queue, final String selector) throws JMSException
     {
+        checkClosed();
         return null;  //TODO
     }
 
     public TemporaryQueueImpl createTemporaryQueue() throws JMSException
     {
+        checkClosed();
         return null;  //TODO
     }
 
     public TemporaryTopicImpl createTemporaryTopic() throws JMSException
     {
+        checkClosed();
         return null;  //TODO
     }
 
     public void unsubscribe(final String s) throws JMSException
     {
+        checkClosed();
         //TODO
     }
 
@@ -286,6 +377,133 @@ public class SessionImpl implements Sess
         _dispatcher.messageArrivedAtConsumer(messageConsumer);
     }
 
+    MessageImpl convertMessage(final javax.jms.Message message) throws JMSException
+    {
+        MessageImpl replacementMessage;
+
+        if(message instanceof BytesMessage)
+        {
+            replacementMessage = convertBytesMessage((BytesMessage) message);
+        }
+        else if(message instanceof MapMessage)
+        {
+            replacementMessage = convertMapMessage((MapMessage) message);
+        }
+        else if(message instanceof ObjectMessage)
+        {
+            replacementMessage = convertObjectMessage((ObjectMessage) message);
+        }
+        else if(message instanceof StreamMessage)
+        {
+            replacementMessage = convertStreamMessage((StreamMessage) message);
+        }
+        else if(message instanceof TextMessage)
+        {
+            replacementMessage = convertTextMessage((TextMessage) message);
+        }
+        else
+        {
+            replacementMessage = createMessage();
+        }
+
+        convertMessageProperties(message, replacementMessage);
+
+        return replacementMessage;
+    }
+
+
+    private void convertMessageProperties(final javax.jms.Message message, final MessageImpl replacementMessage)
+            throws JMSException
+    {
+        Enumeration propertyNames = message.getPropertyNames();
+        while (propertyNames.hasMoreElements())
+        {
+            String propertyName = String.valueOf(propertyNames.nextElement());
+            // TODO: Shouldn't need to check for JMS properties here as don't think getPropertyNames() should return them
+            if (!propertyName.startsWith("JMSX_"))
+            {
+                Object value = message.getObjectProperty(propertyName);
+                replacementMessage.setObjectProperty(propertyName, value);
+            }
+        }
+
+
+        replacementMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+        if (message.getJMSReplyTo() != null)
+        {
+            replacementMessage.setJMSReplyTo(message.getJMSReplyTo());
+        }
+
+        replacementMessage.setJMSType(message.getJMSType());
+
+        replacementMessage.setJMSCorrelationID(message.getJMSCorrelationID());
+    }
+
+    private MessageImpl convertMapMessage(final MapMessage message) throws JMSException
+    {
+        MapMessageImpl mapMessage = createMapMessage();
+
+        Enumeration mapNames = message.getMapNames();
+        while (mapNames.hasMoreElements())
+        {
+            String name = (String) mapNames.nextElement();
+            mapMessage.setObject(name, message.getObject(name));
+        }
+
+        return mapMessage;
+    }
+
+    private MessageImpl convertBytesMessage(final BytesMessage message) throws JMSException
+    {
+        BytesMessageImpl bytesMessage = createBytesMessage();
+
+        message.reset();
+
+        byte[] buf = new byte[1024];
+
+        int len;
+
+        while ((len = message.readBytes(buf)) != -1)
+        {
+            bytesMessage.writeBytes(buf, 0, len);
+        }
+
+        return bytesMessage;
+    }
+
+    private MessageImpl convertObjectMessage(final ObjectMessage message) throws JMSException
+    {
+        ObjectMessageImpl objectMessage = createObjectMessage();
+        objectMessage.setObject(message.getObject());
+        return objectMessage;
+    }
+
+    private MessageImpl convertStreamMessage(final StreamMessage message) throws JMSException
+    {
+        StreamMessageImpl streamMessage = createStreamMessage();
+
+        try
+        {
+            message.reset();
+            while (true)
+            {
+                streamMessage.writeObject(message.readObject());
+            }
+        }
+        catch (MessageEOFException e)
+        {
+            // we're at the end so don't mind the exception
+        }
+
+        return streamMessage;
+    }
+
+    private MessageImpl convertTextMessage(final TextMessage message) throws JMSException
+    {
+        return createTextMessage(message.getText());
+    }
+
 
     private class Dispatcher implements Runnable
     {
@@ -315,7 +533,7 @@ public class SessionImpl implements Sess
                     while(_started && !_messageConsumerList.isEmpty())
                     {
                         MessageConsumerImpl consumer = _messageConsumerList.remove(0);
-                        MessageListener listener = consumer.getMessageListener();
+                        MessageListener listener = consumer._messageListener;
                         Message msg = consumer.receive0(0L);
 
                         MessageImpl message = consumer.createJMSMessage(msg);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java?rev=1165093&r1=1165092&r2=1165093&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java Sun Sep  4 19:21:45 2011
@@ -25,6 +25,7 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
 
 import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
 import java.util.*;
 
 public class StreamMessageImpl extends MessageImpl implements StreamMessage
@@ -34,6 +35,8 @@ public class StreamMessageImpl extends M
     private int _position = -1;
     private int _offset = -1;
 
+
+
     protected StreamMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List list,
                                 Footer footer, SessionImpl session)
     {
@@ -59,52 +62,197 @@ public class StreamMessageImpl extends M
 
     public boolean readBoolean() throws JMSException
     {
-        return false;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Boolean)
+        {
+            return (Boolean) obj;
+        }
+        if(obj instanceof String || obj == null)
+        {
+            return Boolean.valueOf((String)obj);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot read " + obj.getClass().getName() + " as boolean");
+        }
+    }
+
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _list.clear();
+        _position = -1;
+        _offset = -1;
     }
 
     public byte readByte() throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Byte)
+        {
+            return (Byte) obj;
+        }
+        else if(obj instanceof String || obj == null)
+        {
+            return Byte.valueOf((String)obj);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
     }
 
     public short readShort() throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Short)
+        {
+            return (Short) obj;
+        }
+        else if(obj instanceof Byte)
+        {
+            return (Byte) obj;
+        }
+        else if(obj instanceof String || obj == null)
+        {
+            return Short.valueOf((String)obj);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
+
     }
 
     public char readChar() throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Character)
+        {
+            return (Character) obj;
+        }
+        if(obj == null)
+        {
+            throw new NullPointerException();
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot read " + obj.getClass().getName() + " as boolean");
+        }
+
     }
 
     public int readInt() throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Integer)
+        {
+            return (Integer) obj;
+        }
+        else if(obj instanceof Short)
+        {
+            return (Short) obj;
+        }
+        else if(obj instanceof Byte)
+        {
+            return (Byte) obj;
+        }
+        else if(obj instanceof String || obj == null)
+        {
+            return Integer.valueOf((String)obj);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
     }
 
     public long readLong() throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Long)
+        {
+            return (Long) obj;
+        }
+        else if(obj instanceof Integer)
+        {
+            return (Integer) obj;
+        }
+        else if(obj instanceof Short)
+        {
+            return (Short) obj;
+        }
+        else if(obj instanceof Byte)
+        {
+            return (Byte) obj;
+        }
+        else if(obj instanceof String || obj == null)
+        {
+            return Long.valueOf((String)obj);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
     }
 
     public float readFloat() throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Float)
+        {
+            return (Float) obj;
+        }
+        else if(obj instanceof String || obj == null)
+        {
+            return Float.valueOf((String)obj);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
     }
 
     public double readDouble() throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(obj instanceof Double)
+        {
+            return (Double) obj;
+        }
+        else if(obj instanceof Float)
+        {
+            return (Float) obj;
+        }
+        else if(obj instanceof String || obj == null)
+        {
+            return Double.valueOf((String)obj);
+        }
+        else
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
     }
 
     public String readString() throws JMSException
     {
-        return null;  //TODO
+        Object obj = readObject();
+        if(obj instanceof byte[])
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
+        return String.valueOf(obj);
     }
 
     public int readBytes(final byte[] bytes) throws JMSException
     {
-        return 0;  //TODO
+        Object obj = readObject();
+        if(!(obj instanceof byte[]))
+        {
+            throw new MessageFormatException("Cannot convert value of type " + obj.getClass().getName());
+        }
+        return -1;
     }
 
     public Object readObject() throws JMSException
@@ -121,7 +269,7 @@ public class StreamMessageImpl extends M
 
     public void writeBoolean(final boolean b) throws JMSException
     {
-        //TODO
+        _list.add(b);
     }
 
     public void writeByte(final byte b) throws JMSException
@@ -176,12 +324,16 @@ public class StreamMessageImpl extends M
 
     public void writeObject(final Object o) throws JMSException
     {
-        //TODO
+        if(o == null || _supportedClasses.contains(o.getClass()))
+        {
+            _list.add(o);
+        }
     }
 
     public void reset() throws JMSException
     {
-        //TODO
+        _position = -1;
+        _offset = -1;
     }
 
     @Override Collection<Section> getSections()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org