You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/09/04 23:08:41 UTC

svn commit: r1165110 - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms: example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/ src/main/java/org/apache/qpid/amqp_1_0/jms/impl/

Author: rgodfrey
Date: Sun Sep  4 21:08:40 2011
New Revision: 1165110

URL: http://svn.apache.org/viewvc?rev=1165110&view=rev
Log:
NO-JIRA: JMS Fixes

Modified:
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java Sun Sep  4 21:08:40 2011
@@ -110,6 +110,18 @@ public class Hello
                             System.out.println(o.getClass().getName() + ": " + o);
 
                         }
+                        else if(message instanceof ObjectMessage)
+                        {
+                            System.out.println("Received Object Message:");
+                            System.out.println("========================");
+                            ObjectMessage objectMessage = (ObjectMessage)message;
+                            Object o = objectMessage.getObject();
+                            System.out.println(o.getClass().getName() + ": " + o);
+                        }
+                        else
+                        {
+                            System.out.println("Received Message " + message.getClass().getName());
+                        }
                     }
                     catch (JMSException e)
                     {
@@ -124,8 +136,9 @@ public class Hello
 
             MessageProducer messageProducer = producersession.createProducer(queue);
             TextMessage message = producersession.createTextMessage("Hello world!");
+            message.setJMSType("Hello");
             messageProducer.send(message);
-
+           /*
             MapMessage mapmessage = producersession.createMapMessage();
             mapmessage.setBoolean("mybool", true);
             mapmessage.setString("mystring", "hello");
@@ -139,13 +152,18 @@ public class Hello
 
             messageProducer.send(bytesMessage);
 
-            StreamMessage streamMessage = producersession.createStreamMessage();
+            ObjectMessage objectMessage = producersession.createObjectMessage();
+            objectMessage.setObject(new Double("3.14159265358979323846264338327950288"));
+
+            messageProducer.send(objectMessage);
+
+/*          StreamMessage streamMessage = producersession.createStreamMessage();
             streamMessage.writeBoolean(true);
             streamMessage.writeLong(18031974L);
             streamMessage.writeString("this is a stream Message");
             streamMessage.writeChar('£');
             messageProducer.send(streamMessage);
-
+*/
             Thread.sleep(50000L);
 
             connection.close();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java Sun Sep  4 21:08:40 2011
@@ -23,6 +23,7 @@ import org.apache.qpid.amqp_1_0.type.Sec
 import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
 import org.apache.qpid.amqp_1_0.type.messaging.Footer;
 import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
 
 import java.util.*;
@@ -31,16 +32,16 @@ public class AmqpMessageImpl extends Mes
 {
     private List<Section> _sections;
 
-    protected AmqpMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List<Section> sections,
+    protected AmqpMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List<Section> sections,
                               Footer footer, SessionImpl session)
     {
-        super(header, properties, appProperties, footer, session);
+        super(header, messageAnnotations, properties, appProperties, footer, session);
         _sections = sections;
     }
 
     protected AmqpMessageImpl(final SessionImpl session)
     {
-        super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+        super(new Header(), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
               session);
         _sections = new ArrayList<Section>();
     }
@@ -64,6 +65,10 @@ public class AmqpMessageImpl extends Mes
     {
         List<Section> sections = new ArrayList<Section>();
         sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
         sections.add(getProperties());
         sections.add(getApplicationProperties());
         sections.addAll(_sections);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java Sun Sep  4 21:08:40 2011
@@ -39,10 +39,10 @@ public class BytesMessageImpl extends Me
     private Data _dataIn;
 
     // message created for reading
-    protected BytesMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Data data,
+    protected BytesMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Data data,
                                Footer footer, SessionImpl session)
     {
-        super(header, properties, appProperties, footer, session);
+        super(header, messageAnnotations, properties, appProperties, footer, session);
         _dataIn = data;
         final Binary dataBuffer = data.getValue();
         _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
@@ -52,7 +52,11 @@ public class BytesMessageImpl extends Me
     // message created to be sent
     protected BytesMessageImpl(final SessionImpl session)
     {
-        super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+        super(new Header(),
+              new MessageAnnotations(new HashMap()),
+              new Properties(),
+              new ApplicationProperties(new HashMap()),
+              new Footer(Collections.EMPTY_MAP),
               session);
 
         _bytesOut = new ByteArrayOutputStream();
@@ -496,6 +500,10 @@ public class BytesMessageImpl extends Me
     {
         List<Section> sections = new ArrayList<Section>();
         sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
         sections.add(getProperties());
         sections.add(getApplicationProperties());
         sections.add(getDataSection());

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Sun Sep  4 21:08:40 2011
@@ -104,21 +104,29 @@ public class ConnectionFactoryImpl imple
 
     public QueueConnection createQueueConnection() throws JMSException
     {
-        return createConnection();
+        final ConnectionImpl connection = createConnection();
+        connection.setQueueConnection(true);
+        return connection;
     }
 
     public QueueConnection createQueueConnection(final String username, final String password) throws JMSException
     {
-        return createConnection(username, password);
+        final ConnectionImpl connection = createConnection(username, password);
+        connection.setQueueConnection(true);
+        return connection;
     }
 
     public TopicConnection createTopicConnection() throws JMSException
     {
-        return createConnection();
+        final ConnectionImpl connection = createConnection();
+        connection.setTopicConnection(true);
+        return connection;
     }
 
     public TopicConnection createTopicConnection(final String username, final String password) throws JMSException
     {
-        return createConnection(username, password);
+        final ConnectionImpl connection = createConnection(username, password);
+        connection.setTopicConnection(true);
+        return connection;
     }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Sun Sep  4 21:08:40 2011
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
 import org.apache.qpid.amqp_1_0.jms.Connection;
 import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
 import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.transport.Container;
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
@@ -38,6 +39,8 @@ public class ConnectionImpl implements C
     private final Object _lock = new Object();
 
     private org.apache.qpid.amqp_1_0.client.Connection _conn;
+    private boolean _isQueueConnection;
+    private boolean _isTopicConnection;
 
 
     private static enum State
@@ -51,10 +54,11 @@ public class ConnectionImpl implements C
 
     public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
     {
+        Container container = clientId == null ? new Container() : new Container(clientId);
         // TODO - authentication, containerId, clientId, ssl?, etc
         try
         {
-            _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password);
+            _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container);
             // TODO - retrieve negotiated AMQP version
             _connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
         }
@@ -97,7 +101,8 @@ public class ConnectionImpl implements C
             }
 
             SessionImpl session = new SessionImpl(this, acknowledgeMode);
-
+            session.setQueueSession(_isQueueConnection);
+            session.setTopicSession(_isTopicConnection);
             _sessions.add(session);
 
             return session;
@@ -108,7 +113,7 @@ public class ConnectionImpl implements C
     public String getClientID() throws JMSException
     {
         checkClosed();
-        return null;  //TODO
+        return _conn.getEndpoint().getContainer().getId();
     }
 
     public void setClientID(final String s) throws JMSException
@@ -269,4 +274,13 @@ public class ConnectionImpl implements C
         }
     }
 
+    void setQueueConnection(final boolean queueConnection)
+    {
+        _isQueueConnection = queueConnection;
+    }
+
+    void setTopicConnection(final boolean topicConnection)
+    {
+        _isTopicConnection = topicConnection;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java Sun Sep  4 21:08:40 2011
@@ -32,17 +32,18 @@ public class MapMessageImpl extends Mess
 {
     private Map _map;
 
-    public MapMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Map map,
+    public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
                           Footer footer,
                           SessionImpl session)
     {
-        super(header, properties, appProperties, footer, session);
+        super(header, messageAnnotations, properties, appProperties, footer, session);
         _map = map;
     }
 
     MapMessageImpl(final SessionImpl session)
     {
-        super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+        super(new Header(), new MessageAnnotations(new HashMap()),
+              new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
               session);
         _map = new HashMap();
     }
@@ -417,6 +418,10 @@ public class MapMessageImpl extends Mess
     {
         List<Section> sections = new ArrayList<Section>();
         sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
         sections.add(getProperties());
         sections.add(getApplicationProperties());
         sections.add(new AmqpValue(_map));

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Sun Sep  4 21:08:40 2011
@@ -41,6 +41,9 @@ public class MessageConsumerImpl impleme
     private Binary _lastUnackedMessage;
     MessageListener _messageListener;
 
+    private boolean _isQueueConsumer;
+    private boolean _isTopicSubscriber;
+
     private boolean _closed = false;
 
     MessageConsumerImpl(final Destination destination,
@@ -53,10 +56,18 @@ public class MessageConsumerImpl impleme
         if(destination instanceof DestinationImpl)
         {
             _destination = (DestinationImpl) destination;
+            if(destination instanceof javax.jms.Queue)
+            {
+                _isQueueConsumer = true;
+            }
+            else if(destination instanceof javax.jms.Topic)
+            {
+                _isTopicSubscriber = true;
+            }
         }
-        else if(destination != null)
+        else
         {
-            // TODO - throw appropriate exception
+            throw new InvalidDestinationException("Invalid destination class");
         }
         _session = session;
 
@@ -64,7 +75,7 @@ public class MessageConsumerImpl impleme
 
     }
 
-    protected Receiver createClientReceiver()
+    protected Receiver createClientReceiver() throws IllegalStateException
     {
         return _session.getClientSession().createReceiver(_destination.getAddress());
     }
@@ -142,7 +153,10 @@ public class MessageConsumerImpl impleme
         if(msg != null)
         {
             MessageFactory factory = _session.getMessageFactory();
-            return factory.createMessage(_destination, msg);
+            final MessageImpl message = factory.createMessage(_destination, msg);
+            message.setFromQueue(_isQueueConsumer);
+            message.setFromTopic(_isTopicSubscriber);
+            return message;
         }
         else
         {
@@ -230,4 +244,14 @@ public class MessageConsumerImpl impleme
     {
         return (Topic) getDestination();
     }
+
+    void setQueueConsumer(final boolean queueConsumer)
+    {
+        _isQueueConsumer = queueConsumer;
+    }
+
+    void setTopicSubscriber(final boolean topicSubscriber)
+    {
+        _isTopicSubscriber = topicSubscriber;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java Sun Sep  4 21:08:40 2011
@@ -46,6 +46,8 @@ class MessageFactory
         MessageImpl message;
         List<Section> payload = msg.getPayload();
         Header header = null;
+        MessageAnnotations messageAnnotations = null;
+
         Properties properties = null;
         ApplicationProperties appProperties = null;
         Footer footer;
@@ -61,6 +63,12 @@ class MessageFactory
             section = iter.hasNext() ? iter.next() : null;
         }
 
+        if(section instanceof MessageAnnotations)
+        {
+            messageAnnotations = (MessageAnnotations) section;
+            section = iter.hasNext() ? iter.next() : null;
+        }
+
         if(section instanceof Properties)
         {
             properties = (Properties) section;
@@ -86,16 +94,16 @@ class MessageFactory
             Section bodySection = body.get(0);
             if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
             {
-                message = new MapMessageImpl(header, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
+                message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
             }
             else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
             {
-                message = new StreamMessageImpl(header, properties, appProperties,
+                message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
                                                 (List) ((AmqpValue)bodySection).getValue(), footer, _session);
             }
             else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
             {
-                message = new TextMessageImpl(header, properties, appProperties,
+                message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
                                                 (String) ((AmqpValue)bodySection).getValue(), footer, _session);
             }
             else if(bodySection instanceof Data)
@@ -120,16 +128,16 @@ class MessageFactory
                         e.printStackTrace();  //TODO
                     }
 
-                    message = new ObjectMessageImpl(header, properties, appProperties, serializable, footer, _session);
+                    message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties, serializable, footer, _session);
                 }
                 else
                 {
-                    message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
+                    message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
                 }
             }
             else if(bodySection instanceof AmqpSequence)
             {
-                message = new StreamMessageImpl(header, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+                message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
             }
 
             /*else if(bodySection instanceof AmqpDataSection)
@@ -174,12 +182,12 @@ class MessageFactory
             }*/
             else
             {
-                message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+                message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
             }
         }
         else
         {
-            message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+            message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
         }
 
         message.setReadOnly();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java Sun Sep  4 21:08:40 2011
@@ -33,6 +33,7 @@ import org.apache.qpid.amqp_1_0.type.Uns
 import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
 import org.apache.qpid.amqp_1_0.type.messaging.Footer;
 import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
 
 import javax.jms.DeliveryMode;
@@ -49,6 +50,7 @@ public abstract class MessageImpl implem
     static final Set<Class> _supportedClasses =
                 new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
                                                  Float.class, Double.class, Character.class, String.class, byte[].class));
+    private static final Symbol JMS_TYPE = Symbol.valueOf("apache.qpid.amqp_1_0-jms-type");
 
     private Header _header;
     private Properties _properties;
@@ -57,8 +59,13 @@ public abstract class MessageImpl implem
     public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
     private SessionImpl _sessionImpl;
     private boolean _readOnly;
+    private MessageAnnotations _messageAnnotations;
+
+    private boolean _isFromQueue;
+    private boolean _isFromTopic;
 
     protected MessageImpl(Header header,
+                          MessageAnnotations messageAnnotations,
                           Properties properties,
                           ApplicationProperties appProperties,
                           Footer footer,
@@ -66,6 +73,7 @@ public abstract class MessageImpl implem
     {
         _header = header == null ? new Header() : header;
         _properties = properties == null ? new Properties() : properties;
+        _messageAnnotations = messageAnnotations == null ? new MessageAnnotations(new HashMap()) : messageAnnotations;
         _footer = footer == null ? new Footer(Collections.EMPTY_MAP) : footer;
         _applicationProperties = appProperties == null ? new ApplicationProperties(new HashMap()) : appProperties;
         _sessionImpl = session;
@@ -170,7 +178,9 @@ public abstract class MessageImpl implem
 
     public DestinationImpl getJMSDestination() throws JMSException
     {
-        return DestinationImpl.valueOf(getTo());
+        return _isFromQueue ? QueueImpl.valueOf(getTo())
+                            : _isFromTopic ? TopicImpl.valueOf(getTo())
+                                           : DestinationImpl.valueOf(getTo());
     }
 
     public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
@@ -240,20 +250,22 @@ public abstract class MessageImpl implem
 
     public String getJMSType() throws JMSException
     {
-        final MessageAttributes messageAttrs = getHeaderMessageAttrs();
-        final Object attrValue = messageAttrs == null ? null : messageAttrs.get(Symbol.valueOf("apache.qpid.amqp_1_0-jms-type"));
+        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+        final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
 
         return attrValue instanceof String ? attrValue.toString() : null;
     }
 
     public void setJMSType(String s) throws JMSException
     {
-        MessageAttributes messageAttrs = getHeaderMessageAttrs();
+        Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
         if(messageAttrs == null)
         {
-            // TODO - Solve MessageAttrs problem
-            messageAttrs = null;
+            messageAttrs = new HashMap();
+            _messageAnnotations = new MessageAnnotations(messageAttrs);
         }
+
+        messageAttrs.put(JMS_TYPE, s);
     }
 
     public long getJMSExpiration() throws JMSException
@@ -1056,10 +1068,30 @@ public abstract class MessageImpl implem
         return _footer;
     }
 
+    MessageAnnotations getMessageAnnotations()
+    {
+        return _messageAnnotations;
+    }
+
     public ApplicationProperties getApplicationProperties()
     {
         return _applicationProperties;
     }
 
+    public void reset() throws JMSException
+    {
+        _readOnly = true;
+    }
+
+    void setFromQueue(final boolean fromQueue)
+    {
+        _isFromQueue = fromQueue;
+    }
+
+    void setFromTopic(final boolean fromTopic)
+    {
+        _isFromTopic = fromTopic;
+    }
+
     abstract Collection<Section> getSections();
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Sun Sep  4 21:08:40 2011
@@ -52,21 +52,24 @@ public class MessageProducerImpl impleme
         }
         else if(destination != null)
         {
-            // TODO - throw appropriate exception
+            throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
         }
         _session = session;
 
-        try
-        {
-            _sender = _session.getClientSession().createSender(_destination.getAddress());
-        }
-        catch (Sender.SenderCreationException e)
+        if(_destination != null)
         {
-            // TODO - refine exception
-            JMSException jmsEx = new JMSException(e.getMessage());
-            jmsEx.initCause(e);
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
+            try
+            {
+                _sender = _session.getClientSession().createSender(_destination.getAddress());
+            }
+            catch (Sender.SenderCreationException e)
+            {
+                // TODO - refine exception
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
         }
     }
 
@@ -151,7 +154,10 @@ public class MessageProducerImpl impleme
             if(!_closed)
             {
                 _closed = true;
-                _sender.close();
+                if(_sender != null)
+                {
+                    _sender.close();
+                }
             }
 
         }
@@ -255,7 +261,55 @@ public class MessageProducerImpl impleme
     public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
             throws JMSException
     {
-        //TODO
+
+        checkClosed();
+        if(destination == null)
+        {
+            send(message, deliveryMode, priority, ttl);
+        }
+        else
+        {
+            if(_destination != null)
+            {
+                throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
+            }
+            else if(!(destination instanceof DestinationImpl))
+            {
+                throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+            }
+            try
+            {
+                _destination = (DestinationImpl) destination;
+                _sender = _session.getClientSession().createSender(_destination.getAddress());
+
+                send(message, deliveryMode, priority, ttl);
+
+                _sender.close();
+
+
+
+            }
+            catch (Sender.SenderCreationException e)
+            {
+                // TODO - refine exception
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+            catch (Sender.SenderClosingException e)
+            {
+                JMSException jmsEx = new JMSException(e.getMessage());
+                jmsEx.initCause(e);
+                jmsEx.setLinkedException(e);
+                throw jmsEx;
+            }
+            finally
+            {
+                _sender = null;
+                _destination = null;
+            }
+        }
     }
 
     public Queue getQueue() throws JMSException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java Sun Sep  4 21:08:40 2011
@@ -40,20 +40,22 @@ public class ObjectMessageImpl extends M
     private Serializable _object;
 
     protected ObjectMessageImpl(Header header,
+                                MessageAnnotations messageAnnotations,
                                 Properties properties,
                                 ApplicationProperties appProperties,
                                 Serializable object,
                                 Footer footer,
                                 SessionImpl session)
     {
-        super(header, properties, appProperties, footer, session);
+        super(header, messageAnnotations, properties, appProperties, footer, session);
         getProperties().setContentType(CONTENT_TYPE);
         _object = object;
     }
 
     protected ObjectMessageImpl(final SessionImpl session)
     {
-        super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+        super(new Header(), new MessageAnnotations(new HashMap()),
+              new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
               session);
         getProperties().setContentType(CONTENT_TYPE);
     }
@@ -74,6 +76,10 @@ public class ObjectMessageImpl extends M
     {
         List<Section> sections = new ArrayList<Section>();
         sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
         sections.add(getProperties());
         sections.add(getApplicationProperties());
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java Sun Sep  4 21:08:40 2011
@@ -48,4 +48,9 @@ public class QueueImpl extends Destinati
         return queue;
     }
 
+    public static QueueImpl valueOf(String address)
+    {
+        return address == null ? null : createQueue(address);
+    }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Sun Sep  4 21:08:40 2011
@@ -22,7 +22,7 @@ import org.apache.qpid.amqp_1_0.client.R
 import org.apache.qpid.amqp_1_0.jms.Queue;
 import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
 
-import javax.jms.JMSException;
+import javax.jms.*;
 
 public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
 {
@@ -33,9 +33,10 @@ public class QueueReceiverImpl extends M
             throws JMSException
     {
         super(destination, session, selector, noLocal);
+        setQueueConsumer(true);
     }
 
-    protected Receiver createClientReceiver()
+    protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
     {
         return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
     }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java Sun Sep  4 21:08:40 2011
@@ -28,6 +28,7 @@ public class QueueSessionImpl extends Se
     protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
     {
         super(connection, acknowledgeMode);
+        setQueueSession(true);
     }
 
     public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Sun Sep  4 21:08:40 2011
@@ -20,6 +20,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
 
 import org.apache.qpid.amqp_1_0.client.Connection;
 import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
 import org.apache.qpid.amqp_1_0.jms.QueueSender;
 import org.apache.qpid.amqp_1_0.jms.QueueSession;
@@ -27,6 +28,7 @@ import org.apache.qpid.amqp_1_0.jms.Sess
 import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
 import org.apache.qpid.amqp_1_0.jms.TopicSession;
 import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
@@ -50,6 +52,9 @@ public class SessionImpl implements Sess
 
     private boolean _closed;
 
+    private boolean _isQueueSession;
+    private boolean _isTopicSession;
+
     protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
     {
         _connection = connection;
@@ -102,8 +107,7 @@ public class SessionImpl implements Sess
 
     public TextMessageImpl createTextMessage() throws JMSException
     {
-        checkClosed();
-        return new TextMessageImpl(this);
+        return createTextMessage("");
     }
 
     public TextMessageImpl createTextMessage(final String s) throws JMSException
@@ -293,6 +297,10 @@ public class SessionImpl implements Sess
             throws JMSException
     {
         checkClosed();
+        if(!(topic instanceof TopicImpl))
+        {
+            throw new InvalidDestinationException("invalid destination " + topic);
+        }
         return null;  //TODO
     }
 
@@ -317,7 +325,17 @@ public class SessionImpl implements Sess
     public TemporaryQueueImpl createTemporaryQueue() throws JMSException
     {
         checkClosed();
-        return null;  //TODO
+        try
+        {
+            Sender send = _session.createTemporaryQueueSender();
+
+            TemporaryQueueImpl tempQ = new TemporaryQueueImpl(((Target)send.getTarget()).getAddress(), send);
+            return tempQ;
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            throw new JMSException("Unable to create temporary queue");
+        }
     }
 
     public TemporaryTopicImpl createTemporaryTopic() throws JMSException
@@ -329,6 +347,7 @@ public class SessionImpl implements Sess
     public void unsubscribe(final String s) throws JMSException
     {
         checkClosed();
+
         //TODO
     }
 
@@ -605,4 +624,14 @@ public class SessionImpl implements Sess
             }
         }
     }
+
+    void setQueueSession(final boolean queueSession)
+    {
+        _isQueueSession = queueSession;
+    }
+
+    void setTopicSession(final boolean topicSession)
+    {
+        _isTopicSession = topicSession;
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java Sun Sep  4 21:08:40 2011
@@ -37,26 +37,28 @@ public class StreamMessageImpl extends M
 
 
 
-    protected StreamMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List list,
+    protected StreamMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List list,
                                 Footer footer, SessionImpl session)
     {
-        super(header, properties, appProperties, footer, session);
+        super(header, messageAnnotations, properties, appProperties, footer, session);
         _list = list;
     }
 
     StreamMessageImpl(final SessionImpl session)
     {
-        super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+        super(new Header(), new MessageAnnotations(new HashMap()), new Properties(),
+              new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
               session);
         _list = new ArrayList();
     }
 
     public StreamMessageImpl(final Header header,
+                             final MessageAnnotations messageAnnotations,
                              final Properties properties,
                              final ApplicationProperties appProperties,
                              final List amqpListSection, final Footer footer)
     {
-        super(header, properties, appProperties, footer, null);
+        super(header, messageAnnotations, properties, appProperties, footer, null);
         _list = amqpListSection;
     }
 
@@ -257,6 +259,7 @@ public class StreamMessageImpl extends M
 
     public Object readObject() throws JMSException
     {
+        checkReadable();
         if(_offset == -1)
         {
             return _list.get(++_position);
@@ -332,6 +335,7 @@ public class StreamMessageImpl extends M
 
     public void reset() throws JMSException
     {
+        super.reset();
         _position = -1;
         _offset = -1;
     }
@@ -340,6 +344,10 @@ public class StreamMessageImpl extends M
     {
         List<Section> sections = new ArrayList<Section>();
         sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
         sections.add(getProperties());
         sections.add(getApplicationProperties());
         sections.add(new AmqpValue(_list));

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java Sun Sep  4 21:08:40 2011
@@ -18,19 +18,36 @@
  */
 package org.apache.qpid.amqp_1_0.jms.impl;
 
+import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.TemporaryQueue;
 
 import javax.jms.JMSException;
 
 public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue
 {
-    protected TemporaryQueueImpl(String address)
+    private Sender _sender;
+
+    protected TemporaryQueueImpl(String address, Sender sender)
     {
         super(address);
+        _sender = sender;
     }
 
     public void delete() throws JMSException
     {
-        //TODO
+        try
+        {
+            if(_sender != null)
+            {
+                _sender.close();
+                _sender = null;
+            }
+        }
+        catch (Sender.SenderClosingException e)
+        {
+            final JMSException jmsException = new JMSException(e.getMessage());
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
     }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java Sun Sep  4 21:08:40 2011
@@ -33,19 +33,21 @@ public class TextMessageImpl extends Mes
     private String _text;
 
     protected TextMessageImpl(Header header,
+                              MessageAnnotations messageAnnotations,
                               Properties properties,
                               ApplicationProperties appProperties,
                               String text,
                               Footer footer,
                               SessionImpl session)
     {
-        super(header, properties, appProperties, footer, session);
+        super(header, messageAnnotations, properties, appProperties, footer, session);
         _text = text;
     }
 
     protected TextMessageImpl(final SessionImpl session)
     {
-        super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+        super(new Header(), new MessageAnnotations(new HashMap()),
+              new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
               session);
     }
 
@@ -68,6 +70,10 @@ public class TextMessageImpl extends Mes
     {
         List<Section> sections = new ArrayList<Section>();
         sections.add(getHeader());
+        if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+        {
+            sections.add(getMessageAnnotations());
+        }
         sections.add(getProperties());
         sections.add(getApplicationProperties());
         AmqpValue section = new AmqpValue(_text);
@@ -75,4 +81,6 @@ public class TextMessageImpl extends Mes
         sections.add(getFooter());
         return sections;
     }
+
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java Sun Sep  4 21:08:40 2011
@@ -49,5 +49,8 @@ public class TopicImpl extends Destinati
         return topic;
     }
 
-
+    public static TopicImpl valueOf(String address)
+    {
+        return address == null ? null : createTopic(address);
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java Sun Sep  4 21:08:40 2011
@@ -28,6 +28,7 @@ public class TopicSessionImpl extends Se
     protected TopicSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
     {
         super(connection, acknowledgeMode);
+        setTopicSession(true);
     }
 
     public TopicSubscriberImpl createSubscriber(final Topic topic) throws JMSException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java Sun Sep  4 21:08:40 2011
@@ -22,7 +22,7 @@ import org.apache.qpid.amqp_1_0.client.R
 import org.apache.qpid.amqp_1_0.jms.Topic;
 import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
 
-import javax.jms.JMSException;
+import javax.jms.*;
 
 public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber
 {
@@ -33,6 +33,7 @@ public class TopicSubscriberImpl extends
             throws JMSException
     {
         super(destination, session, selector, noLocal);
+        setTopicSubscriber(true);
     }
 
     public TopicImpl getTopic() throws JMSException
@@ -41,7 +42,7 @@ public class TopicSubscriberImpl extends
     }
 
 
-    protected Receiver createClientReceiver()
+    protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
     {
         return getSession().getClientSession().createCopyingReceiver(getDestination().getAddress());
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org