You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/10/03 14:58:02 UTC
svn commit: r1178400 - in /qpid/branches/rg-amqp-1-0-sandbox/qpid/java:
amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/
amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
amqp-1-0-client/src/main/java/o...
Author: rgodfrey
Date: Mon Oct 3 12:58:01 2011
New Revision: 1178400
URL: http://svn.apache.org/viewvc?rev=1178400&view=rev
Log:
NO-JIRA : 1-0 Prototype JMS updates
Modified:
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java Mon Oct 3 12:58:01 2011
@@ -59,7 +59,7 @@ public class Hello
Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer messageConsumer = consumerSession.createConsumer(queue, "hello='true'");
+ MessageConsumer messageConsumer = consumerSession.createConsumer(queue, "hello='true' and 7");
messageConsumer.setMessageListener(new MessageListener()
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java Mon Oct 3 12:58:01 2011
@@ -469,7 +469,21 @@ public class BytesMessageImpl extends Me
public void reset() throws JMSException
{
- //TODO
+ if(_bytesOut != null)
+ {
+ byte[] data = _bytesOut.toByteArray();
+ _dataIn = new Data(new Binary(data));
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(data));
+ _dataAsOutput = null;
+ _bytesOut = null;
+ }
+ else
+ {
+
+ final Binary dataBuffer = _dataIn.getValue();
+ _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
+
+ }
}
private JMSException handleInputException(final IOException e)
@@ -496,6 +510,15 @@ public class BytesMessageImpl extends Me
return ex;
}
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _bytesOut = new ByteArrayOutputStream();
+ _dataAsOutput = new DataOutputStream(_bytesOut);
+ _dataAsInput = null;
+ }
+
@Override Collection<Section> getSections()
{
List<Section> sections = new ArrayList<Section>();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/DestinationImpl.java Mon Oct 3 12:58:01 2011
@@ -20,10 +20,13 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.Destination;
+import org.apache.qpid.amqp_1_0.jms.Queue;
+import org.apache.qpid.amqp_1_0.jms.Topic;
+import javax.jms.JMSException;
import java.util.WeakHashMap;
-public class DestinationImpl implements Destination
+public class DestinationImpl implements Destination, Queue, Topic
{
private static final WeakHashMap<String, DestinationImpl> DESTINATION_CACHE =
new WeakHashMap<String, DestinationImpl>();
@@ -69,4 +72,14 @@ public class DestinationImpl implements
}
return destination;
}
+
+ public String getQueueName() throws JMSException
+ {
+ return getAddress();
+ }
+
+ public String getTopicName() throws JMSException
+ {
+ return getAddress();
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java Mon Oct 3 12:58:01 2011
@@ -20,6 +20,7 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.jms.MapMessage;
+import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
@@ -240,7 +241,7 @@ public class MapMessageImpl extends Mess
{
return (String) value;
}
- else if (value instanceof byte[])
+ else if (value instanceof byte[] || value instanceof Binary)
{
throw new MessageFormatException("Property " + name + " of type byte[] " + "cannot be converted to String.");
}
@@ -262,6 +263,10 @@ public class MapMessageImpl extends Mess
{
return (byte[]) value;
}
+ else if(value instanceof Binary)
+ {
+ return ((Binary)value).getArray();
+ }
else
{
throw new MessageFormatException("Property " + name + " of type " + value.getClass().getName()
@@ -270,7 +275,8 @@ public class MapMessageImpl extends Mess
public Object getObject(String s) throws JMSException
{
- return get(s);
+ Object val = get(s);
+ return val instanceof Binary ? ((Binary)val).getArray() : val;
}
public Enumeration getMapNames() throws JMSException
@@ -362,7 +368,7 @@ public class MapMessageImpl extends Mess
System.arraycopy(bytes,offset,val,0,length);
}
- put(name, val);
+ put(name, new Binary(val));
}
public void setObject(String name, Object value) throws JMSException
@@ -406,6 +412,13 @@ public class MapMessageImpl extends Mess
return _map.keySet();
}
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _map.clear();
+ }
+
private void checkPropertyName(String propName)
{
if ((propName == null) || propName.equals(""))
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Mon Oct 3 12:58:01 2011
@@ -26,13 +26,12 @@ import org.apache.qpid.amqp_1_0.jms.Queu
import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.Topic;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Symbol;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.Filter;
import org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -79,18 +78,37 @@ public class MessageConsumerImpl impleme
}
else
{
- throw new InvalidDestinationException("Invalid destination class");
+ throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
}
_session = session;
_receiver = createClientReceiver();
+
}
- protected Receiver createClientReceiver() throws IllegalStateException
+ protected Receiver createClientReceiver() throws JMSException
{
- return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
- null, false, getFilters(), null);
+ try
+ {
+ return _session.getClientSession(). createReceiver(_destination.getAddress(), AcknowledgeMode.ALO,
+ null, false, getFilters(), null);
+ }
+ catch (AmqpErrorException e)
+ {
+ Error error = e.getError();
+ if(AmqpError.INVALID_FIELD.equals(error.getCondition())
+ && error.getInfo() != null && Symbol.valueOf("filter").equals(error.getInfo().get(Symbol.valueOf
+ ("field"))))
+ {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ else
+ {
+ throw new JMSException(e.getMessage(), error.getCondition().getValue().toString());
+
+ }
+ }
}
private Map<Symbol, Filter> getFilters()
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java Mon Oct 3 12:58:01 2011
@@ -50,7 +50,7 @@ public abstract class MessageImpl implem
static final Set<Class> _supportedClasses =
new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
Float.class, Double.class, Character.class, String.class, byte[].class));
- private static final Symbol JMS_TYPE = Symbol.valueOf("apache.qpid.amqp_1_0-jms-type");
+ private static final Symbol JMS_TYPE = Symbol.valueOf("jms-type");
private Header _header;
private Properties _properties;
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Mon Oct 3 12:58:01 2011
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import javax.jms.*;
@@ -36,9 +37,16 @@ public class QueueReceiverImpl extends M
setQueueConsumer(true);
}
- protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
+ protected Receiver createClientReceiver() throws JMSException
{
- return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+ try
+ {
+ return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new JMSException(e.getMessage(), e.getError().getCondition().toString());
+ }
}
public Queue getQueue() throws JMSException
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Mon Oct 3 12:58:01 2011
@@ -341,7 +341,17 @@ public class SessionImpl implements Sess
public TemporaryTopicImpl createTemporaryTopic() throws JMSException
{
checkClosed();
- return null; //TODO
+ try
+ {
+ Sender send = _session.createTemporaryQueueSender();
+
+ TemporaryTopicImpl tempQ = new TemporaryTopicImpl(((Target)send.getTarget()).getAddress(), send);
+ return tempQ;
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ throw new JMSException("Unable to create temporary queue");
+ }
}
public void unsubscribe(final String s) throws JMSException
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java Mon Oct 3 12:58:01 2011
@@ -18,19 +18,37 @@
*/
package org.apache.qpid.amqp_1_0.jms.impl;
+import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.TemporaryTopic;
import javax.jms.JMSException;
public class TemporaryTopicImpl extends TopicImpl implements TemporaryTopic
{
- protected TemporaryTopicImpl(String address)
+ private Sender _sender;
+
+ protected TemporaryTopicImpl(String address, Sender sender)
{
super(address);
+ _sender = sender;
}
public void delete() throws JMSException
{
- //TODO
+ try
+ {
+ if(_sender != null)
+ {
+ _sender.close();
+ _sender = null;
+ }
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java Mon Oct 3 12:58:01 2011
@@ -66,6 +66,13 @@ public class TextMessageImpl extends Mes
return _text;
}
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _text = null;
+ }
+
@Override Collection<Section> getSections()
{
List<Section> sections = new ArrayList<Section>();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java Mon Oct 3 12:58:01 2011
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
import org.apache.qpid.amqp_1_0.client.Receiver;
import org.apache.qpid.amqp_1_0.jms.Topic;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import javax.jms.*;
@@ -42,9 +43,16 @@ public class TopicSubscriberImpl extends
}
- protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
+ protected Receiver createClientReceiver() throws JMSException
{
- return getSession().getClientSession().createCopyingReceiver(getDestination().getAddress());
+ try
+ {
+ return getSession().getClientSession().createCopyingReceiver(getDestination().getAddress());
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new JMSException(e.getMessage(), e.getError().getCondition().toString());
+ }
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Filereceiver.java Mon Oct 3 12:58:01 2011
@@ -1,9 +1,6 @@
package org.apache.qpid.amqp_1_0.client;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Section;
-import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
@@ -316,6 +313,10 @@ public class Filereceiver extends Util
{
e.printStackTrace(); //TODO.
}
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receive.java Mon Oct 3 12:58:01 2011
@@ -21,6 +21,7 @@
package org.apache.qpid.amqp_1_0.client;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.UnsignedLong;
@@ -229,6 +230,10 @@ public class Receive extends Util
{
e.printStackTrace(); //TODO.
}
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Mon Oct 3 12:58:01 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.amqp_1_0.client;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
@@ -31,9 +32,8 @@ import org.apache.qpid.amqp_1_0.type.mes
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
-import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
import java.nio.ByteBuffer;
import java.util.*;
@@ -49,12 +49,13 @@ public class Receiver implements Deliver
private Queue<Transfer> _prefetchQueue = new ConcurrentLinkedQueue<Transfer>();
private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary, SettledAction>();
private MessageArrivalListener _messageArrivalListener;
+ private org.apache.qpid.amqp_1_0.type.transport.Error _error;
public Receiver(final Session session,
final String linkName,
final Target target,
final Source source,
- final AcknowledgeMode ackMode)
+ final AcknowledgeMode ackMode) throws AmqpErrorException
{
this(session, linkName, target, source, ackMode, false);
}
@@ -64,7 +65,7 @@ public class Receiver implements Deliver
final Target target,
final Source source,
final AcknowledgeMode ackMode,
- boolean isDurable)
+ boolean isDurable) throws AmqpErrorException
{
this(session,linkName,target,source,ackMode,isDurable,null);
}
@@ -75,7 +76,7 @@ public class Receiver implements Deliver
final Source source,
final AcknowledgeMode ackMode,
final boolean isDurable,
- final Map<Binary,Outcome> unsettled)
+ final Map<Binary,Outcome> unsettled) throws AmqpErrorException
{
_session = session;
@@ -113,6 +114,13 @@ public class Receiver implements Deliver
_prefetchQueue.add(xfr);
postPrefetchAction();
}
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
+ {
+ _error = detach.getError();
+ super.remoteDetached(endpoint, detach);
+ }
});
_endpoint.setLocalUnsettled(unsettled);
@@ -121,7 +129,7 @@ public class Receiver implements Deliver
synchronized(_endpoint.getLock())
{
- while(!_endpoint.isAttached() || _endpoint.isDetached())
+ while(!_endpoint.isAttached() && !_endpoint.isDetached())
{
try
{
@@ -133,6 +141,25 @@ public class Receiver implements Deliver
}
}
}
+
+ if(_endpoint.getSource() == null)
+ {
+ synchronized(_endpoint.getLock())
+ {
+ while(!_endpoint.isDetached())
+ {
+ try
+ {
+ _endpoint.getLock().wait();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ }
+ throw new AmqpErrorException(getError());
+ }
}
private void postPrefetchAction()
@@ -257,15 +284,15 @@ public class Receiver implements Deliver
synchronized(lock)
{
Transfer xfr;
- while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained())
+ while(((xfr = _prefetchQueue.peek()) == null) && !_endpoint.isDrained() && wait != 0)
{
try
{
- if(wait>=0L)
+ if(wait>0L)
{
lock.wait(wait);
}
- else
+ else if(wait<0L)
{
lock.wait();
}
@@ -356,6 +383,11 @@ public class Receiver implements Deliver
_endpoint.updateDisposition(deliveryTag,state, settled);
}
+ public Error getError()
+ {
+ return _error;
+ }
+
public void acknowledgeAll(Message m)
{
acknowledgeAll(m.getDeliveryTag());
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java Mon Oct 3 12:58:01 2011
@@ -21,6 +21,7 @@
package org.apache.qpid.amqp_1_0.client;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.UnsignedLong;
@@ -227,6 +228,10 @@ public class Request extends Util
{
e.printStackTrace(); //TODO.
}
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java Mon Oct 3 12:58:01 2011
@@ -21,6 +21,7 @@
package org.apache.qpid.amqp_1_0.client;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import org.apache.qpid.amqp_1_0.type.UnsignedLong;
@@ -281,6 +282,10 @@ public class Respond extends Util
{
e.printStackTrace(); //TODO.
}
+ catch (AmqpErrorException e)
+ {
+ e.printStackTrace(); //TODO.
+ }
}
private void respond(Message m) throws Sender.SenderCreationException
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java Mon Oct 3 12:58:01 2011
@@ -26,10 +26,7 @@ import org.apache.qpid.amqp_1_0.messagin
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.DistributionMode;
-import org.apache.qpid.amqp_1_0.type.Outcome;
-import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.Filter;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
@@ -104,73 +101,79 @@ public class Session
}
- public Receiver createReceiver(final String sourceAddr)
+ public Receiver createReceiver(final String sourceAddr) throws AmqpErrorException
{
return createReceiver(sourceAddr, null, AcknowledgeMode.ALO);
}
- public Receiver createReceiver(final String queue, final AcknowledgeMode mode)
+ public Receiver createReceiver(final String queue, final AcknowledgeMode mode) throws AmqpErrorException
{
return createReceiver(queue, null, mode);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName)
+ throws AmqpErrorException
{
return createReceiver(queue, null, mode, linkName);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable)
+ throws AmqpErrorException
{
return createReceiver(queue, null, mode, linkName, isDurable);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName, boolean isDurable,
Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws AmqpErrorException
{
return createReceiver(queue, null, mode, linkName, isDurable, filters, unsettled);
}
public Receiver createReceiver(final String queue, final AcknowledgeMode mode, String linkName,
- boolean isDurable, Map<Binary, Outcome> unsettled)
+ boolean isDurable, Map<Binary, Outcome> unsettled) throws AmqpErrorException
{
return createReceiver(queue, null, mode, linkName, isDurable, unsettled);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode)
+ throws AmqpErrorException
{
return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode, String linkName)
+ throws AmqpErrorException
{
return createReceiver(sourceAddr, mode, AcknowledgeMode.ALO, linkName);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode)
+ final AcknowledgeMode ackMode) throws AmqpErrorException
{
return createReceiver(sourceAddr, mode, ackMode, null);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
- final AcknowledgeMode ackMode, String linkName)
+ final AcknowledgeMode ackMode, String linkName) throws AmqpErrorException
{
return createReceiver(sourceAddr,mode, ackMode, linkName, false);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
final AcknowledgeMode ackMode, String linkName, boolean isDurable)
+ throws AmqpErrorException
{
return createReceiver(sourceAddr, mode, ackMode, linkName, isDurable, null);
}
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
final AcknowledgeMode ackMode, String linkName, boolean isDurable,
- Map<Binary, Outcome> unsettled)
+ Map<Binary, Outcome> unsettled) throws AmqpErrorException
{
return createReceiver(sourceAddr,mode,ackMode, linkName, isDurable, null, unsettled);
}
@@ -178,6 +181,7 @@ public class Session
private synchronized Receiver createReceiver(final String sourceAddr, DistributionMode mode,
final AcknowledgeMode ackMode, String linkName, boolean isDurable,
Map<Symbol, Filter> filters, Map<Binary, Outcome> unsettled)
+ throws AmqpErrorException
{
final Target target = new Target();
@@ -200,17 +204,17 @@ public class Session
}
- public synchronized Receiver createCopyingReceiver(final String sourceAddr)
+ public synchronized Receiver createCopyingReceiver(final String sourceAddr) throws AmqpErrorException
{
return createReceiver(sourceAddr, StdDistMode.COPY);
}
- public synchronized Receiver createMovingReceiver(final String sourceAddr)
+ public synchronized Receiver createMovingReceiver(final String sourceAddr) throws AmqpErrorException
{
return createReceiver(sourceAddr, StdDistMode.MOVE);
}
- public Receiver createTemporaryQueueReceiver()
+ public Receiver createTemporaryQueueReceiver() throws AmqpErrorException
{
Source source = new Source();
source.setDynamic(true);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/codec/ErrorConstructor.java Mon Oct 3 12:58:01 2011
@@ -26,6 +26,7 @@ package org.apache.qpid.amqp_1_0.type.tr
import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructor;
import org.apache.qpid.amqp_1_0.codec.DescribedTypeConstructorRegistry;
import org.apache.qpid.amqp_1_0.type.*;
+import org.apache.qpid.amqp_1_0.type.transaction.TransactionErrors;
import org.apache.qpid.amqp_1_0.type.transport.*;
@@ -65,17 +66,32 @@ public class ErrorConstructor extends De
if(val != null)
{
-
- try
+ if(val instanceof ErrorCondition)
{
obj.setCondition( (ErrorCondition) val );
}
- catch(ClassCastException e)
+ else if(val instanceof Symbol)
{
-
- // TODO Error
+ ErrorCondition condition = null;
+ condition = AmqpError.valueOf(val);
+ if(condition == null)
+ {
+ condition = ConnectionError.valueOf(val);
+ if(condition == null)
+ {
+ condition = SessionError.valueOf(val);
+ if(condition == null)
+ {
+ condition = LinkError.valueOf(val);
+ if(condition == null)
+ {
+ condition = TransactionErrors.valueOf(val);
+ }
+ }
+ }
+ }
+ obj.setCondition(condition);
}
-
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java Mon Oct 3 12:58:01 2011
@@ -42,7 +42,7 @@ import java.util.*;
public class MessageMetaData_1_0 implements StorableMessageMetaData
{
// TODO move to somewhere more useful
- public static final Symbol JMS_TYPE = Symbol.valueOf("x-jms-type");
+ public static final Symbol JMS_TYPE = Symbol.valueOf("jms-type");
private Header _header;
@@ -437,13 +437,13 @@ public class MessageMetaData_1_0 impleme
public String getType()
{
- if(_appProperties == null || _appProperties.get(JMS_TYPE) == null)
+ if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null)
{
return null;
}
else
{
- return _appProperties.get(JMS_TYPE).toString();
+ return _messageAnnotations.get(JMS_TYPE).toString();
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Mon Oct 3 12:58:01 2011
@@ -31,9 +31,9 @@ import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.messaging.*;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
-import org.apache.qpid.amqp_1_0.type.transport.Detach;
-import org.apache.qpid.amqp_1_0.type.transport.Transfer;
+import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeType;
@@ -71,6 +71,7 @@ public class SendingLink_1_0 implements
public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
final VirtualHost vhost,
final SendingDestination destination)
+ throws AmqpErrorException
{
_vhost = vhost;
_destination = destination;
@@ -99,29 +100,36 @@ public class SendingLink_1_0 implements
Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
- for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+ if(filters != null)
{
- if(entry.getValue() instanceof NoLocalFilter)
+ for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
{
- actualFilters.put(entry.getKey(), entry.getValue());
- noLocal = true;
- }
- else if(messageFilter == null && entry.getValue() instanceof JMSSelectorFilter)
- {
-
- JMSSelectorFilter selectorFilter = (JMSSelectorFilter) entry.getValue();
- try
+ if(entry.getValue() instanceof NoLocalFilter)
{
- messageFilter = new JMSSelectorMessageFilter(selectorFilter.getValue());
-
actualFilters.put(entry.getKey(), entry.getValue());
+ noLocal = true;
}
- catch (AMQInvalidArgumentException e)
+ else if(messageFilter == null && entry.getValue() instanceof JMSSelectorFilter)
{
- }
+ JMSSelectorFilter selectorFilter = (JMSSelectorFilter) entry.getValue();
+ try
+ {
+ messageFilter = new JMSSelectorMessageFilter(selectorFilter.getValue());
+
+ actualFilters.put(entry.getKey(), entry.getValue());
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.INVALID_FIELD);
+ error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
+ }
}
}
source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
@@ -182,7 +190,10 @@ public class SendingLink_1_0 implements
_subscription = new Subscription_1_0(this, qd);
_subscription.setNoLocal(noLocal);
- _subscription.setFilters(new SimpleFilterManager(messageFilter));
+ if(messageFilter!=null)
+ {
+ _subscription.setFilters(new SimpleFilterManager(messageFilter));
+ }
try
{
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1178400&r1=1178399&r2=1178400&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Mon Oct 3 12:58:01 2011
@@ -24,6 +24,7 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.LifetimePolicy;
import org.apache.qpid.amqp_1_0.type.Symbol;
@@ -130,22 +131,31 @@ public class Session_1_0 implements Sess
if(destination != null)
{
final SendingLinkEndpoint sendingLinkEndpoint = (SendingLinkEndpoint) endpoint;
- final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
- _vhost,
- (SendingDestination) destination
- );
- sendingLinkEndpoint.setLinkEventListener(sendingLink);
- link = sendingLink;
- if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+ try
{
- linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
- sendingLink.setCloseAction(new Runnable() {
+ final SendingLink_1_0 sendingLink = new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
+ _vhost,
+ (SendingDestination) destination
+ );
+ sendingLinkEndpoint.setLinkEventListener(sendingLink);
+ link = sendingLink;
+ if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()))
+ {
+ linkRegistry.registerSendingLink(endpoint.getName(), sendingLink);
+ sendingLink.setCloseAction(new Runnable() {
- public void run()
- {
- linkRegistry.unregisterSendingLink(endpoint.getName());
- }
- });
+ public void run()
+ {
+ linkRegistry.unregisterSendingLink(endpoint.getName());
+ }
+ });
+ }
+ }
+ catch(AmqpErrorException e)
+ {
+ destination = null;
+ sendingLinkEndpoint.setSource(null);
+ error = e.getError();
}
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org