You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/10/03 14:58:02 UTC

svn commit: r1178400 - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java: amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/ amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ amqp-1-0-client/src/main/java/o...

Author: rgodfrey
Date: Mon Oct  3 12:58:01 2011
New Revision: 1178400

URL: http://svn.apache.org/viewvc?rev=1178400&view=rev
Log:
NO-JIRA : 1-0 Prototype JMS updates

Modified:
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java Mon Oct  3 12:58:01 2011
@@ -59,7 +59,7 @@ public class Hello
 
 
             Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer messageConsumer = consumerSession.createConsumer(queue, "hello='true'");
+            MessageConsumer messageConsumer = consumerSession.createConsumer(queue, "hello='true' and 7");
 
             messageConsumer.setMessageListener(new MessageListener()
             {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java Mon Oct  3 12:58:01 2011
@@ -469,7 +469,21 @@ public class BytesMessageImpl extends Me
 
     public void reset() throws JMSException
     {
-        //TODO
+        if(_bytesOut != null)
+        {
+            byte[] data = _bytesOut.toByteArray();
+            _dataIn = new Data(new Binary(data));
+            _dataAsInput = new DataInputStream(new ByteArrayInputStream(data));
+            _dataAsOutput = null;
+            _bytesOut = null;
+        }
+        else
+        {
+
+            final Binary dataBuffer = _dataIn.getValue();
+            _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
+        }
     }
 
     private JMSException handleInputException(final IOException e)
@@ -496,6 +510,15 @@ public class BytesMessageImpl extends Me
         return ex;
     }
 
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _bytesOut = new ByteArrayOutputStream();
+        _dataAsOutput = new DataOutputStream(_bytesOut);
+        _dataAsInput = null;
+    }
+
     @Override Collection<Section> getSections()
     {
         List<Section> sections = new ArrayList<Section>();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java Mon Oct  3 12:58:01 2011
@@ -20,10 +20,13 @@
 package org.apache.qpid.amqp_1_0.jms.impl;
 
 import org.apache.qpid.amqp_1_0.jms.Destination;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
 
+import javax.jms.JMSException;
 import java.util.WeakHashMap;
 
-public class DestinationImpl implements Destination
+public class DestinationImpl implements Destination, Queue, Topic
 {
     private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
             new WeakHashMap<String, DestinationImpl>();
@@ -69,4 +72,14 @@ public class DestinationImpl implements 
         }
         return destination;
     }
+
+    public String getQueueName() throws JMSException
+    {
+        return getAddress();
+    }
+
+    public String getTopicName() throws JMSException
+    {
+        return getAddress();
+    }
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java Mon Oct  3 12:58:01 2011
@@ -20,6 +20,7 @@
 package org.apache.qpid.amqp_1_0.jms.impl;
 
 import org.apache.qpid.amqp_1_0.jms.MapMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.qpid.amqp_1_0.type.messaging.Properties;
@@ -240,7 +241,7 @@ public class MapMessageImpl extends Mess
         {
             return (String) value;
         }
-        else if (value instanceof byte[])
+        else if (value instanceof byte[] || value instanceof Binary)
         {
             throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
         }
@@ -262,6 +263,10 @@ public class MapMessageImpl extends Mess
         {
             return (byte[]) value;
         }
+        else if(value instanceof Binary)
+        {
+            return ((Binary)value).getArray();
+        }
         else
         {
             throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
@@ -270,7 +275,8 @@ public class MapMessageImpl extends Mess
 
     public Object getObject(String s) throws JMSException
     {
-        return get(s);
+        Object val = get(s);
+        return val instanceof Binary ? ((Binary)val).getArray() : val;
     }
 
     public Enumeration getMapNames() throws JMSException
@@ -362,7 +368,7 @@ public class MapMessageImpl extends Mess
             System.arraycopy(bytes,offset,val,0,length);
         }
 
-        put(name, val);
+        put(name, new Binary(val));
     }
 
     public void setObject(String name, Object value) throws JMSException
@@ -406,6 +412,13 @@ public class MapMessageImpl extends Mess
         return _map.keySet();
     }
 
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _map.clear();
+    }
+
     private void checkPropertyName(String propName)
     {
         if ((propName == null) || propName.equals(""))

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Mon Oct  3 12:58:01 2011
@@ -26,13 +26,12 @@ import org.apache.qpid.amqp_1_0.jms.Queu
 import org.apache.qpid.amqp_1_0.jms.Queue;
 import org.apache.qpid.amqp_1_0.jms.Topic;
 import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.messaging.Filter;
 import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
@@ -79,18 +78,37 @@ public class MessageConsumerImpl impleme
         }
         else
         {
-            throw new InvalidDestinationException("Invalid destination class");
+            throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
         }
         _session = session;
 
         _receiver = createClientReceiver();
 
+
     }
 
-    protected Receiver createClientReceiver() throws IllegalStateException
+    protected Receiver createClientReceiver() throws JMSException
     {
-        return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
-                                                           null, false, getFilters(), null);
+        try
+        {
+            return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+                                                               null, false, getFilters(), null);
+        }
+        catch (AmqpErrorException e)
+        {
+            Error error = e.getError();
+            if(AmqpError.INVALID_FIELD.equals(error.getCondition())
+                &&  error.getInfo() != null && Symbol.valueOf("filter").equals(error.getInfo().get(Symbol.valueOf
+                    ("field"))))
+            {
+                throw new InvalidSelectorException(e.getMessage());
+            }
+            else
+            {
+                throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+
+            }
+        }
     }
 
     private Map<Symbol, Filter> getFilters()

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java Mon Oct  3 12:58:01 2011
@@ -50,7 +50,7 @@ public abstract class MessageImpl implem
     static final Set<Class> _supportedClasses =
                 new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
                                                  Float.class, Double.class, Character.class, String.class, byte[].class));
-    private static final Symbol JMS_TYPE = Symbol.valueOf("apache.qpid.amqp_1_0-jms-type");
+    private static final Symbol JMS_TYPE = Symbol.valueOf("jms-type");
 
     private Header _header;
     private Properties _properties;

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Mon Oct  3 12:58:01 2011
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
 import org.apache.qpid.amqp_1_0.client.Receiver;
 import org.apache.qpid.amqp_1_0.jms.Queue;
 import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 
 import javax.jms.*;
 
@@ -36,9 +37,16 @@ public class QueueReceiverImpl extends M
         setQueueConsumer(true);
     }
 
-    protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
+    protected Receiver createClientReceiver() throws JMSException
     {
-        return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+        try
+        {
+            return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+        }
+        catch (AmqpErrorException e)
+        {
+            throw new JMSException(e.getMessage(), e.getError().getCondition().toString());
+        }
     }
 
     public Queue getQueue() throws JMSException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Mon Oct  3 12:58:01 2011
@@ -341,7 +341,17 @@ public class SessionImpl implements Sess
     public TemporaryTopicImpl createTemporaryTopic() throws JMSException
     {
         checkClosed();
-        return null;  //TODO
+        try
+        {
+            Sender send = _session.createTemporaryQueueSender();
+
+            TemporaryTopicImpl tempQ = new TemporaryTopicImpl(((Target)send.getTarget()).getAddress(), send);
+            return tempQ;
+        }
+        catch (Sender.SenderCreationException e)
+        {
+            throw new JMSException("Unable to create temporary queue");
+        }
     }
 
     public void unsubscribe(final String s) throws JMSException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java Mon Oct  3 12:58:01 2011
@@ -18,19 +18,37 @@
  */
 package org.apache.qpid.amqp_1_0.jms.impl;
 
+import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.TemporaryTopic;
 
 import javax.jms.JMSException;
 
 public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic
 {
-    protected TemporaryTopicImpl(String address)
+    private Sender _sender;
+
+    protected TemporaryTopicImpl(String address, Sender sender)
     {
         super(address);
+        _sender = sender;
     }
 
     public void delete() throws JMSException
     {
-        //TODO
+        try
+        {
+            if(_sender != null)
+            {
+                _sender.close();
+                _sender = null;
+            }
+        }
+        catch (Sender.SenderClosingException e)
+        {
+            final JMSException jmsException = new JMSException(e.getMessage());
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
     }
+
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java Mon Oct  3 12:58:01 2011
@@ -66,6 +66,13 @@ public class TextMessageImpl extends Mes
         return _text;
     }
 
+    @Override
+    public void clearBody() throws JMSException
+    {
+        super.clearBody();
+        _text = null;
+    }
+
     @Override Collection<Section> getSections()
     {
         List<Section> sections = new ArrayList<Section>();

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java Mon Oct  3 12:58:01 2011
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
 import org.apache.qpid.amqp_1_0.client.Receiver;
 import org.apache.qpid.amqp_1_0.jms.Topic;
 import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 
 import javax.jms.*;
 
@@ -42,9 +43,16 @@ public class TopicSubscriberImpl extends
     }
 
 
-    protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
+    protected Receiver createClientReceiver() throws JMSException
     {
-        return getSession().getClientSession().createCopyingReceiver(getDestination().getAddress());
+        try
+        {
+            return getSession().getClientSession().createCopyingReceiver(getDestination().getAddress());
+        }
+        catch (AmqpErrorException e)
+        {
+            throw new JMSException(e.getMessage(), e.getError().getCondition().toString());
+        }
     }
 
 }

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java Mon Oct  3 12:58:01 2011
@@ -1,9 +1,6 @@
 package org.apache.qpid.amqp_1_0.client;
 
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
@@ -316,6 +313,10 @@ public class Filereceiver extends Util
         {
             e.printStackTrace();  //TODO.
         }
+        catch (AmqpErrorException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
 
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java Mon Oct  3 12:58:01 2011
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.amqp_1_0.client;
 
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.UnsignedLong;
@@ -229,6 +230,10 @@ public class Receive extends Util
         {
             e.printStackTrace();  //TODO.
         }
+        catch (AmqpErrorException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
 
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Mon Oct  3 12:58:01 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.amqp_1_0.client;
 
 import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
 
@@ -31,9 +32,8 @@ import org.apache.qpid.amqp_1_0.type.mes
 import org.apache.qpid.amqp_1_0.type.messaging.Source;
 import org.apache.qpid.amqp_1_0.type.messaging.Target;
 import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 
 import java.nio.ByteBuffer;
 import java.util.*;
@@ -49,12 +49,13 @@ public class Receiver implements Deliver
     private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
     private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
     private MessageArrivalListener _messageArrivalListener;
+    private org.apache.qpid.amqp_1_0.type.transport.Error _error;
 
     public Receiver(final Session session,
                     final String linkName,
                     final Target target,
                     final Source source,
-                    final AcknowledgeMode ackMode)
+                    final AcknowledgeMode ackMode) throws AmqpErrorException
     {
         this(session, linkName, target, source, ackMode, false);
     }
@@ -64,7 +65,7 @@ public class Receiver implements Deliver
                     final Target target,
                     final Source source,
                     final AcknowledgeMode ackMode,
-                    boolean isDurable)
+                    boolean isDurable) throws AmqpErrorException
     {
         this(session,linkName,target,source,ackMode,isDurable,null);
     }
@@ -75,7 +76,7 @@ public class Receiver implements Deliver
                     final Source source,
                     final AcknowledgeMode ackMode,
                     final boolean isDurable,
-                    final Map<Binary,Outcome> unsettled)
+                    final Map<Binary,Outcome> unsettled) throws AmqpErrorException
     {
 
         _session = session;
@@ -113,6 +114,13 @@ public class Receiver implements Deliver
                 _prefetchQueue.add(xfr);
                 postPrefetchAction();
             }
+
+            @Override
+            public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+            {
+                _error = detach.getError();
+                super.remoteDetached(endpoint, detach);
+            }
         });
 
         _endpoint.setLocalUnsettled(unsettled);
@@ -121,7 +129,7 @@ public class Receiver implements Deliver
 
         synchronized(_endpoint.getLock())
         {
-            while(!_endpoint.isAttached() || _endpoint.isDetached())
+            while(!_endpoint.isAttached() && !_endpoint.isDetached())
             {
                 try
                 {
@@ -133,6 +141,25 @@ public class Receiver implements Deliver
                 }
             }
         }
+
+        if(_endpoint.getSource() == null)
+        {
+            synchronized(_endpoint.getLock())
+            {
+                while(!_endpoint.isDetached())
+                {
+                    try
+                    {
+                        _endpoint.getLock().wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+                    }
+                }
+            }
+            throw new AmqpErrorException(getError());
+        }
     }
 
     private void postPrefetchAction()
@@ -257,15 +284,15 @@ public class Receiver implements Deliver
             synchronized(lock)
             {
                 Transfer xfr;
-                while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained())
+                while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && wait != 0)
                 {
                     try
                     {
-                        if(wait>=0L)
+                        if(wait>0L)
                         {
                             lock.wait(wait);
                         }
-                        else
+                        else if(wait<0L)
                         {
                             lock.wait();
                         }
@@ -356,6 +383,11 @@ public class Receiver implements Deliver
         _endpoint.updateDisposition(deliveryTag,state, settled);
     }
 
+    public Error getError()
+    {
+        return _error;
+    }
+
     public void acknowledgeAll(Message m)
     {
         acknowledgeAll(m.getDeliveryTag());

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java Mon Oct  3 12:58:01 2011
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.amqp_1_0.client;
 
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.UnsignedLong;
@@ -227,6 +228,10 @@ public class Request extends Util
         {
             e.printStackTrace();  //TODO.
         }
+        catch (AmqpErrorException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
 
     }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java Mon Oct  3 12:58:01 2011
@@ -21,6 +21,7 @@
 
 package org.apache.qpid.amqp_1_0.client;
 
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 import org.apache.qpid.amqp_1_0.type.UnsignedLong;
@@ -281,6 +282,10 @@ public class Respond extends Util
         {
             e.printStackTrace();  //TODO.
         }
+        catch (AmqpErrorException e)
+        {
+            e.printStackTrace();  //TODO.
+        }
     }
 
     private void respond(Message m) throws Sender.SenderCreationException

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Mon Oct  3 12:58:01 2011
@@ -26,10 +26,7 @@ import org.apache.qpid.amqp_1_0.messagin
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DistributionMode;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.messaging.Filter;
 import org.apache.qpid.amqp_1_0.type.messaging.Source;
 import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
@@ -104,73 +101,79 @@ public class Session
     }
 
 
-    public Receiver createReceiver(final String sourceAddr)
+    public Receiver createReceiver(final String sourceAddr) throws AmqpErrorException
     {
         return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
     }
 
 
-    public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
+    public Receiver createReceiver(final String queue, final AcknowledgeMode mode) throws AmqpErrorException
     {
         return createReceiver(queue, null, mode);
     }
 
     public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
+            throws AmqpErrorException
     {
         return createReceiver(queue, null, mode, linkName);
     }
 
     public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
+            throws AmqpErrorException
     {
         return createReceiver(queue, null, mode, linkName, isDurable);
     }
 
     public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
                                    Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+            throws AmqpErrorException
     {
         return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
     }
 
 
     public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
-                                   boolean isDurable, Map<Binary, Outcome> unsettled)
+                                   boolean isDurable, Map<Binary, Outcome> unsettled) throws AmqpErrorException
     {
         return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
     }
 
 
     private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+            throws AmqpErrorException
     {
         return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
     }
 
     private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+            throws AmqpErrorException
     {
         return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
     }
 
 
     private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode)
+                                            final AcknowledgeMode ackMode) throws AmqpErrorException
     {
         return createReceiver(sourceAddr, mode, ackMode, null);
     }
 
     private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
-                                            final AcknowledgeMode ackMode, String linkName)
+                                            final AcknowledgeMode ackMode, String linkName) throws AmqpErrorException
     {
         return createReceiver(sourceAddr,mode, ackMode, linkName, false);
     }
 
     private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
                                             final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+            throws AmqpErrorException
     {
         return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
     }
 
     private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
                                             final AcknowledgeMode ackMode, String linkName, boolean isDurable,
-                                            Map<Binary, Outcome> unsettled)
+                                            Map<Binary, Outcome> unsettled) throws AmqpErrorException
     {
         return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
     }
@@ -178,6 +181,7 @@ public class Session
     private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
                                             final AcknowledgeMode ackMode, String linkName, boolean isDurable,
                                             Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+            throws AmqpErrorException
     {
 
         final Target target = new Target();
@@ -200,17 +204,17 @@ public class Session
 
     }
 
-    public synchronized Receiver createCopyingReceiver(final String sourceAddr)
+    public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws AmqpErrorException
     {
         return createReceiver(sourceAddr, StdDistMode.COPY);
     }
 
-    public synchronized Receiver createMovingReceiver(final String sourceAddr)
+    public synchronized Receiver createMovingReceiver(final String sourceAddr) throws AmqpErrorException
     {
         return createReceiver(sourceAddr, StdDistMode.MOVE);
     }
 
-    public Receiver createTemporaryQueueReceiver()
+    public Receiver createTemporaryQueueReceiver() throws AmqpErrorException
     {
         Source source = new Source();
         source.setDynamic(true);

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java Mon Oct  3 12:58:01 2011
@@ -26,6 +26,7 @@ package org.apache.qpid.amqp_1_0.type.tr
 import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
 import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
 import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionErrors;
 import org.apache.qpid.amqp_1_0.type.transport.*;
 
 
@@ -65,17 +66,32 @@ public class ErrorConstructor extends De
 
                 if(val != null)
                 {
-
-                    try
+                    if(val instanceof ErrorCondition)
                     {
                         obj.setCondition( (ErrorCondition) val );
                     }
-                    catch(ClassCastException e)
+                    else if(val instanceof Symbol)
                     {
-
-                        // TODO Error
+                        ErrorCondition condition = null;
+                        condition = AmqpError.valueOf(val);
+                        if(condition == null)
+                        {
+                            condition = ConnectionError.valueOf(val);
+                            if(condition == null)
+                            {
+                                condition = SessionError.valueOf(val);
+                                if(condition == null)
+                                {
+                                    condition = LinkError.valueOf(val);
+                                    if(condition == null)
+                                    {
+                                        condition = TransactionErrors.valueOf(val);
+                                    }
+                                }
+                            }
+                        }
+                        obj.setCondition(condition);
                     }
-
                 }
 
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java Mon Oct  3 12:58:01 2011
@@ -42,7 +42,7 @@ import java.util.*;
 public class MessageMetaData_1_0 implements StorableMessageMetaData
 {
     // TODO move to somewhere more useful
-    public static final Symbol JMS_TYPE = Symbol.valueOf("x-jms-type");
+    public static final Symbol JMS_TYPE = Symbol.valueOf("jms-type");
 
 
     private Header _header;
@@ -437,13 +437,13 @@ public class MessageMetaData_1_0 impleme
         public String getType()
         {
 
-            if(_appProperties == null || _appProperties.get(JMS_TYPE) == null)
+            if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null)
             {
                 return null;
             }
             else
             {
-                return _appProperties.get(JMS_TYPE).toString();
+                return _messageAnnotations.get(JMS_TYPE).toString();
             }
         }
 

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Oct  3 12:58:01 2011
@@ -31,9 +31,9 @@ import org.apache.qpid.amqp_1_0.type.*;
 
 import org.apache.qpid.amqp_1_0.type.messaging.*;
 import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.transport.Detach;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.amqp_1_0.type.transport.*;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 import org.apache.qpid.server.exchange.DirectExchange;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeType;
@@ -71,6 +71,7 @@ public class SendingLink_1_0 implements 
     public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
                            final VirtualHost vhost,
                            final SendingDestination destination)
+            throws AmqpErrorException
     {
         _vhost = vhost;
         _destination = destination;
@@ -99,29 +100,36 @@ public class SendingLink_1_0 implements 
 
             Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
 
-            for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+            if(filters != null)
             {
-                if(entry.getValue() instanceof NoLocalFilter)
+                for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
                 {
-                    actualFilters.put(entry.getKey(), entry.getValue());
-                    noLocal = true;
-                }
-                else if(messageFilter == null && entry.getValue() instanceof JMSSelectorFilter)
-                {
-
-                    JMSSelectorFilter selectorFilter = (JMSSelectorFilter) entry.getValue();
-                    try
+                    if(entry.getValue() instanceof NoLocalFilter)
                     {
-                        messageFilter = new JMSSelectorMessageFilter(selectorFilter.getValue());
-
                         actualFilters.put(entry.getKey(), entry.getValue());
+                        noLocal = true;
                     }
-                    catch (AMQInvalidArgumentException e)
+                    else if(messageFilter == null && entry.getValue() instanceof JMSSelectorFilter)
                     {
 
-                    }
+                        JMSSelectorFilter selectorFilter = (JMSSelectorFilter) entry.getValue();
+                        try
+                        {
+                            messageFilter = new JMSSelectorMessageFilter(selectorFilter.getValue());
+
+                            actualFilters.put(entry.getKey(), entry.getValue());
+                        }
+                        catch (AMQInvalidArgumentException e)
+                        {
+                            Error error = new Error();
+                            error.setCondition(AmqpError.INVALID_FIELD);
+                            error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+                            error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+                            throw new AmqpErrorException(error);
+                        }
 
 
+                    }
                 }
             }
             source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
@@ -182,7 +190,10 @@ public class SendingLink_1_0 implements 
 
         _subscription = new Subscription_1_0(this, qd);
         _subscription.setNoLocal(noLocal);
-        _subscription.setFilters(new SimpleFilterManager(messageFilter));
+        if(messageFilter!=null)
+        {
+            _subscription.setFilters(new SimpleFilterManager(messageFilter));
+        }
 
         try
         {

Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Oct  3 12:58:01 2011
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.transpor
 import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
 import org.apache.qpid.amqp_1_0.type.Binary;
 import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
 import org.apache.qpid.amqp_1_0.type.Symbol;
@@ -130,22 +131,31 @@ public class Session_1_0 implements Sess
                 if(destination != null)
                 {
                     final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
-                    final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
-                                                                            _vhost,
-                                                                            (SendingDestination) destination
-                    );
-                    sendingLinkEndpoint.setLinkEventListener(sendingLink);
-                    link = sendingLink;
-                    if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+                    try
                     {
-                        linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
-                        sendingLink.setCloseAction(new Runnable() {
+                        final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+                                                                                _vhost,
+                                                                                (SendingDestination) destination
+                        );
+                        sendingLinkEndpoint.setLinkEventListener(sendingLink);
+                        link = sendingLink;
+                        if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+                        {
+                            linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
+                            sendingLink.setCloseAction(new Runnable() {
 
-                            public void run()
-                            {
-                                linkRegistry.unregisterSendingLink(endpoint.getName());
-                            }
-                        });
+                                public void run()
+                                {
+                                    linkRegistry.unregisterSendingLink(endpoint.getName());
+                                }
+                            });
+                        }
+                    }
+                    catch(AmqpErrorException e)
+                    {
+                        destination = null;
+                        sendingLinkEndpoint.setSource(null);
+                        error = e.getError();
                     }
                 }
             }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org