You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/09/04 23:08:41 UTC
svn commit: r1165110 - in
/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms:
example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/
src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
Author: rgodfrey
Date: Sun Sep 4 21:08:40 2011
New Revision: 1165110
URL: http://svn.apache.org/viewvc?rev=1165110&view=rev
Log:
NO-JIRA: JMS Fixes
Modified:
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/example/src/main/java/org/apache/qpid/amqp_1_0/jms/example/Hello.java Sun Sep 4 21:08:40 2011
@@ -110,6 +110,18 @@ public class Hello
System.out.println(o.getClass().getName() + ": " + o);
}
+ else if(message instanceof ObjectMessage)
+ {
+ System.out.println("Received Object Message:");
+ System.out.println("========================");
+ ObjectMessage objectMessage = (ObjectMessage)message;
+ Object o = objectMessage.getObject();
+ System.out.println(o.getClass().getName() + ": " + o);
+ }
+ else
+ {
+ System.out.println("Received Message " + message.getClass().getName());
+ }
}
catch (JMSException e)
{
@@ -124,8 +136,9 @@ public class Hello
MessageProducer messageProducer = producersession.createProducer(queue);
TextMessage message = producersession.createTextMessage("Hello world!");
+ message.setJMSType("Hello");
messageProducer.send(message);
-
+ /*
MapMessage mapmessage = producersession.createMapMessage();
mapmessage.setBoolean("mybool", true);
mapmessage.setString("mystring", "hello");
@@ -139,13 +152,18 @@ public class Hello
messageProducer.send(bytesMessage);
- StreamMessage streamMessage = producersession.createStreamMessage();
+ ObjectMessage objectMessage = producersession.createObjectMessage();
+ objectMessage.setObject(new Double("3.14159265358979323846264338327950288"));
+
+ messageProducer.send(objectMessage);
+
+/* StreamMessage streamMessage = producersession.createStreamMessage();
streamMessage.writeBoolean(true);
streamMessage.writeLong(18031974L);
streamMessage.writeString("this is a stream Message");
streamMessage.writeChar('£');
messageProducer.send(streamMessage);
-
+*/
Thread.sleep(50000L);
connection.close();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java Sun Sep 4 21:08:40 2011
@@ -23,6 +23,7 @@ import org.apache.qpid.amqp_1_0.type.Sec
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.amqp_1_0.type.messaging.Footer;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import java.util.*;
@@ -31,16 +32,16 @@ public class AmqpMessageImpl extends Mes
{
private List<Section> _sections;
- protected AmqpMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List<Section> sections,
+ protected AmqpMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List<Section> sections,
Footer footer, SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_sections = sections;
}
protected AmqpMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
_sections = new ArrayList<Section>();
}
@@ -64,6 +65,10 @@ public class AmqpMessageImpl extends Mes
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.addAll(_sections);
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java Sun Sep 4 21:08:40 2011
@@ -39,10 +39,10 @@ public class BytesMessageImpl extends Me
private Data _dataIn;
// message created for reading
- protected BytesMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Data data,
+ protected BytesMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Data data,
Footer footer, SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_dataIn = data;
final Binary dataBuffer = data.getValue();
_dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength()));
@@ -52,7 +52,11 @@ public class BytesMessageImpl extends Me
// message created to be sent
protected BytesMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(),
+ new MessageAnnotations(new HashMap()),
+ new Properties(),
+ new ApplicationProperties(new HashMap()),
+ new Footer(Collections.EMPTY_MAP),
session);
_bytesOut = new ByteArrayOutputStream();
@@ -496,6 +500,10 @@ public class BytesMessageImpl extends Me
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.add(getDataSection());
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Sun Sep 4 21:08:40 2011
@@ -104,21 +104,29 @@ public class ConnectionFactoryImpl imple
public QueueConnection createQueueConnection() throws JMSException
{
- return createConnection();
+ final ConnectionImpl connection = createConnection();
+ connection.setQueueConnection(true);
+ return connection;
}
public QueueConnection createQueueConnection(final String username, final String password) throws JMSException
{
- return createConnection(username, password);
+ final ConnectionImpl connection = createConnection(username, password);
+ connection.setQueueConnection(true);
+ return connection;
}
public TopicConnection createTopicConnection() throws JMSException
{
- return createConnection();
+ final ConnectionImpl connection = createConnection();
+ connection.setTopicConnection(true);
+ return connection;
}
public TopicConnection createTopicConnection(final String username, final String password) throws JMSException
{
- return createConnection(username, password);
+ final ConnectionImpl connection = createConnection(username, password);
+ connection.setTopicConnection(true);
+ return connection;
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Sun Sep 4 21:08:40 2011
@@ -21,6 +21,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
import org.apache.qpid.amqp_1_0.jms.Connection;
import org.apache.qpid.amqp_1_0.jms.ConnectionMetaData;
import org.apache.qpid.amqp_1_0.jms.Session;
+import org.apache.qpid.amqp_1_0.transport.Container;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -38,6 +39,8 @@ public class ConnectionImpl implements C
private final Object _lock = new Object();
private org.apache.qpid.amqp_1_0.client.Connection _conn;
+ private boolean _isQueueConnection;
+ private boolean _isTopicConnection;
private static enum State
@@ -51,10 +54,11 @@ public class ConnectionImpl implements C
public ConnectionImpl(String host, int port, String username, String password, String clientId) throws JMSException
{
+ Container container = clientId == null ? new Container() : new Container(clientId);
// TODO - authentication, containerId, clientId, ssl?, etc
try
{
- _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password);
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(host, port, username, password, container);
// TODO - retrieve negotiated AMQP version
_connectionMetaData = new ConnectionMetaDataImpl(1,0,0);
}
@@ -97,7 +101,8 @@ public class ConnectionImpl implements C
}
SessionImpl session = new SessionImpl(this, acknowledgeMode);
-
+ session.setQueueSession(_isQueueConnection);
+ session.setTopicSession(_isTopicConnection);
_sessions.add(session);
return session;
@@ -108,7 +113,7 @@ public class ConnectionImpl implements C
public String getClientID() throws JMSException
{
checkClosed();
- return null; //TODO
+ return _conn.getEndpoint().getContainer().getId();
}
public void setClientID(final String s) throws JMSException
@@ -269,4 +274,13 @@ public class ConnectionImpl implements C
}
}
+ void setQueueConnection(final boolean queueConnection)
+ {
+ _isQueueConnection = queueConnection;
+ }
+
+ void setTopicConnection(final boolean topicConnection)
+ {
+ _isTopicConnection = topicConnection;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java Sun Sep 4 21:08:40 2011
@@ -32,17 +32,18 @@ public class MapMessageImpl extends Mess
{
private Map _map;
- public MapMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, Map map,
+ public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map,
Footer footer,
SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_map = map;
}
MapMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
_map = new HashMap();
}
@@ -417,6 +418,10 @@ public class MapMessageImpl extends Mess
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.add(new AmqpValue(_map));
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Sun Sep 4 21:08:40 2011
@@ -41,6 +41,9 @@ public class MessageConsumerImpl impleme
private Binary _lastUnackedMessage;
MessageListener _messageListener;
+ private boolean _isQueueConsumer;
+ private boolean _isTopicSubscriber;
+
private boolean _closed = false;
MessageConsumerImpl(final Destination destination,
@@ -53,10 +56,18 @@ public class MessageConsumerImpl impleme
if(destination instanceof DestinationImpl)
{
_destination = (DestinationImpl) destination;
+ if(destination instanceof javax.jms.Queue)
+ {
+ _isQueueConsumer = true;
+ }
+ else if(destination instanceof javax.jms.Topic)
+ {
+ _isTopicSubscriber = true;
+ }
}
- else if(destination != null)
+ else
{
- // TODO - throw appropriate exception
+ throw new InvalidDestinationException("Invalid destination class");
}
_session = session;
@@ -64,7 +75,7 @@ public class MessageConsumerImpl impleme
}
- protected Receiver createClientReceiver()
+ protected Receiver createClientReceiver() throws IllegalStateException
{
return _session.getClientSession().createReceiver(_destination.getAddress());
}
@@ -142,7 +153,10 @@ public class MessageConsumerImpl impleme
if(msg != null)
{
MessageFactory factory = _session.getMessageFactory();
- return factory.createMessage(_destination, msg);
+ final MessageImpl message = factory.createMessage(_destination, msg);
+ message.setFromQueue(_isQueueConsumer);
+ message.setFromTopic(_isTopicSubscriber);
+ return message;
}
else
{
@@ -230,4 +244,14 @@ public class MessageConsumerImpl impleme
{
return (Topic) getDestination();
}
+
+ void setQueueConsumer(final boolean queueConsumer)
+ {
+ _isQueueConsumer = queueConsumer;
+ }
+
+ void setTopicSubscriber(final boolean topicSubscriber)
+ {
+ _isTopicSubscriber = topicSubscriber;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java Sun Sep 4 21:08:40 2011
@@ -46,6 +46,8 @@ class MessageFactory
MessageImpl message;
List<Section> payload = msg.getPayload();
Header header = null;
+ MessageAnnotations messageAnnotations = null;
+
Properties properties = null;
ApplicationProperties appProperties = null;
Footer footer;
@@ -61,6 +63,12 @@ class MessageFactory
section = iter.hasNext() ? iter.next() : null;
}
+ if(section instanceof MessageAnnotations)
+ {
+ messageAnnotations = (MessageAnnotations) section;
+ section = iter.hasNext() ? iter.next() : null;
+ }
+
if(section instanceof Properties)
{
properties = (Properties) section;
@@ -86,16 +94,16 @@ class MessageFactory
Section bodySection = body.get(0);
if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map)
{
- message = new MapMessageImpl(header, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
+ message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session);
}
else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List)
{
- message = new StreamMessageImpl(header, properties, appProperties,
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties,
(List) ((AmqpValue)bodySection).getValue(), footer, _session);
}
else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String)
{
- message = new TextMessageImpl(header, properties, appProperties,
+ message = new TextMessageImpl(header, messageAnnotations, properties, appProperties,
(String) ((AmqpValue)bodySection).getValue(), footer, _session);
}
else if(bodySection instanceof Data)
@@ -120,16 +128,16 @@ class MessageFactory
e.printStackTrace(); //TODO
}
- message = new ObjectMessageImpl(header, properties, appProperties, serializable, footer, _session);
+ message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties, serializable, footer, _session);
}
else
{
- message = new BytesMessageImpl(header, properties, appProperties, (Data) bodySection, footer, _session);
+ message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session);
}
}
else if(bodySection instanceof AmqpSequence)
{
- message = new StreamMessageImpl(header, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
+ message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session);
}
/*else if(bodySection instanceof AmqpDataSection)
@@ -174,12 +182,12 @@ class MessageFactory
}*/
else
{
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
}
}
else
{
- message = new AmqpMessageImpl(header,properties,appProperties,body,footer, _session);
+ message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session);
}
message.setReadOnly();
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java Sun Sep 4 21:08:40 2011
@@ -33,6 +33,7 @@ import org.apache.qpid.amqp_1_0.type.Uns
import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
import org.apache.qpid.amqp_1_0.type.messaging.Footer;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import javax.jms.DeliveryMode;
@@ -49,6 +50,7 @@ public abstract class MessageImpl implem
static final Set<Class> _supportedClasses =
new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class,
Float.class, Double.class, Character.class, String.class, byte[].class));
+ private static final Symbol JMS_TYPE = Symbol.valueOf("apache.qpid.amqp_1_0-jms-type");
private Header _header;
private Properties _properties;
@@ -57,8 +59,13 @@ public abstract class MessageImpl implem
public static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
private SessionImpl _sessionImpl;
private boolean _readOnly;
+ private MessageAnnotations _messageAnnotations;
+
+ private boolean _isFromQueue;
+ private boolean _isFromTopic;
protected MessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties appProperties,
Footer footer,
@@ -66,6 +73,7 @@ public abstract class MessageImpl implem
{
_header = header == null ? new Header() : header;
_properties = properties == null ? new Properties() : properties;
+ _messageAnnotations = messageAnnotations == null ? new MessageAnnotations(new HashMap()) : messageAnnotations;
_footer = footer == null ? new Footer(Collections.EMPTY_MAP) : footer;
_applicationProperties = appProperties == null ? new ApplicationProperties(new HashMap()) : appProperties;
_sessionImpl = session;
@@ -170,7 +178,9 @@ public abstract class MessageImpl implem
public DestinationImpl getJMSDestination() throws JMSException
{
- return DestinationImpl.valueOf(getTo());
+ return _isFromQueue ? QueueImpl.valueOf(getTo())
+ : _isFromTopic ? TopicImpl.valueOf(getTo())
+ : DestinationImpl.valueOf(getTo());
}
public void setJMSDestination(Destination destination) throws NonAMQPDestinationException
@@ -240,20 +250,22 @@ public abstract class MessageImpl implem
public String getJMSType() throws JMSException
{
- final MessageAttributes messageAttrs = getHeaderMessageAttrs();
- final Object attrValue = messageAttrs == null ? null : messageAttrs.get(Symbol.valueOf("apache.qpid.amqp_1_0-jms-type"));
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
+ final Object attrValue = messageAttrs == null ? null : messageAttrs.get(JMS_TYPE);
return attrValue instanceof String ? attrValue.toString() : null;
}
public void setJMSType(String s) throws JMSException
{
- MessageAttributes messageAttrs = getHeaderMessageAttrs();
+ Map messageAttrs = _messageAnnotations == null ? null : _messageAnnotations.getValue();
if(messageAttrs == null)
{
- // TODO - Solve MessageAttrs problem
- messageAttrs = null;
+ messageAttrs = new HashMap();
+ _messageAnnotations = new MessageAnnotations(messageAttrs);
}
+
+ messageAttrs.put(JMS_TYPE, s);
}
public long getJMSExpiration() throws JMSException
@@ -1056,10 +1068,30 @@ public abstract class MessageImpl implem
return _footer;
}
+ MessageAnnotations getMessageAnnotations()
+ {
+ return _messageAnnotations;
+ }
+
public ApplicationProperties getApplicationProperties()
{
return _applicationProperties;
}
+ public void reset() throws JMSException
+ {
+ _readOnly = true;
+ }
+
+ void setFromQueue(final boolean fromQueue)
+ {
+ _isFromQueue = fromQueue;
+ }
+
+ void setFromTopic(final boolean fromTopic)
+ {
+ _isFromTopic = fromTopic;
+ }
+
abstract Collection<Section> getSections();
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Sun Sep 4 21:08:40 2011
@@ -52,21 +52,24 @@ public class MessageProducerImpl impleme
}
else if(destination != null)
{
- // TODO - throw appropriate exception
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
}
_session = session;
- try
- {
- _sender = _session.getClientSession().createSender(_destination.getAddress());
- }
- catch (Sender.SenderCreationException e)
+ if(_destination != null)
{
- // TODO - refine exception
- JMSException jmsEx = new JMSException(e.getMessage());
- jmsEx.initCause(e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
+ try
+ {
+ _sender = _session.getClientSession().createSender(_destination.getAddress());
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
}
}
@@ -151,7 +154,10 @@ public class MessageProducerImpl impleme
if(!_closed)
{
_closed = true;
- _sender.close();
+ if(_sender != null)
+ {
+ _sender.close();
+ }
}
}
@@ -255,7 +261,55 @@ public class MessageProducerImpl impleme
public void send(final Destination destination, final Message message, final int deliveryMode, final int priority, final long ttl)
throws JMSException
{
- //TODO
+
+ checkClosed();
+ if(destination == null)
+ {
+ send(message, deliveryMode, priority, ttl);
+ }
+ else
+ {
+ if(_destination != null)
+ {
+ throw new UnsupportedOperationException("Cannot use explicit destination pon non-anonymous producer");
+ }
+ else if(!(destination instanceof DestinationImpl))
+ {
+ throw new InvalidDestinationException("Invalid Destination Class" + destination.getClass().getName());
+ }
+ try
+ {
+ _destination = (DestinationImpl) destination;
+ _sender = _session.getClientSession().createSender(_destination.getAddress());
+
+ send(message, deliveryMode, priority, ttl);
+
+ _sender.close();
+
+
+
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ // TODO - refine exception
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ JMSException jmsEx = new JMSException(e.getMessage());
+ jmsEx.initCause(e);
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+ finally
+ {
+ _sender = null;
+ _destination = null;
+ }
+ }
}
public Queue getQueue() throws JMSException
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java Sun Sep 4 21:08:40 2011
@@ -40,20 +40,22 @@ public class ObjectMessageImpl extends M
private Serializable _object;
protected ObjectMessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties appProperties,
Serializable object,
Footer footer,
SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
getProperties().setContentType(CONTENT_TYPE);
_object = object;
}
protected ObjectMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
getProperties().setContentType(CONTENT_TYPE);
}
@@ -74,6 +76,10 @@ public class ObjectMessageImpl extends M
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueImpl.java Sun Sep 4 21:08:40 2011
@@ -48,4 +48,9 @@ public class QueueImpl extends Destinati
return queue;
}
+ public static QueueImpl valueOf(String address)
+ {
+ return address == null ? null : createQueue(address);
+ }
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueReceiverImpl.java Sun Sep 4 21:08:40 2011
@@ -22,7 +22,7 @@ import org.apache.qpid.amqp_1_0.client.R
import org.apache.qpid.amqp_1_0.jms.Queue;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
-import javax.jms.JMSException;
+import javax.jms.*;
public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
{
@@ -33,9 +33,10 @@ public class QueueReceiverImpl extends M
throws JMSException
{
super(destination, session, selector, noLocal);
+ setQueueConsumer(true);
}
- protected Receiver createClientReceiver()
+ protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
{
return getSession().getClientSession().createMovingReceiver(getDestination().getAddress());
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/QueueSessionImpl.java Sun Sep 4 21:08:40 2011
@@ -28,6 +28,7 @@ public class QueueSessionImpl extends Se
protected QueueSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
super(connection, acknowledgeMode);
+ setQueueSession(true);
}
public QueueReceiverImpl createReceiver(final Queue queue) throws JMSException
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Sun Sep 4 21:08:40 2011
@@ -20,6 +20,7 @@ package org.apache.qpid.amqp_1_0.jms.imp
import org.apache.qpid.amqp_1_0.client.Connection;
import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.QueueReceiver;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.QueueSession;
@@ -27,6 +28,7 @@ import org.apache.qpid.amqp_1_0.jms.Sess
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -50,6 +52,9 @@ public class SessionImpl implements Sess
private boolean _closed;
+ private boolean _isQueueSession;
+ private boolean _isTopicSession;
+
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
_connection = connection;
@@ -102,8 +107,7 @@ public class SessionImpl implements Sess
public TextMessageImpl createTextMessage() throws JMSException
{
- checkClosed();
- return new TextMessageImpl(this);
+ return createTextMessage("");
}
public TextMessageImpl createTextMessage(final String s) throws JMSException
@@ -293,6 +297,10 @@ public class SessionImpl implements Sess
throws JMSException
{
checkClosed();
+ if(!(topic instanceof TopicImpl))
+ {
+ throw new InvalidDestinationException("invalid destination " + topic);
+ }
return null; //TODO
}
@@ -317,7 +325,17 @@ public class SessionImpl implements Sess
public TemporaryQueueImpl createTemporaryQueue() throws JMSException
{
checkClosed();
- return null; //TODO
+ try
+ {
+ Sender send = _session.createTemporaryQueueSender();
+
+ TemporaryQueueImpl tempQ = new TemporaryQueueImpl(((Target)send.getTarget()).getAddress(), send);
+ return tempQ;
+ }
+ catch (Sender.SenderCreationException e)
+ {
+ throw new JMSException("Unable to create temporary queue");
+ }
}
public TemporaryTopicImpl createTemporaryTopic() throws JMSException
@@ -329,6 +347,7 @@ public class SessionImpl implements Sess
public void unsubscribe(final String s) throws JMSException
{
checkClosed();
+
//TODO
}
@@ -605,4 +624,14 @@ public class SessionImpl implements Sess
}
}
}
+
+ void setQueueSession(final boolean queueSession)
+ {
+ _isQueueSession = queueSession;
+ }
+
+ void setTopicSession(final boolean topicSession)
+ {
+ _isTopicSession = topicSession;
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java Sun Sep 4 21:08:40 2011
@@ -37,26 +37,28 @@ public class StreamMessageImpl extends M
- protected StreamMessageImpl(Header header, Properties properties, ApplicationProperties appProperties, List list,
+ protected StreamMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List list,
Footer footer, SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_list = list;
}
StreamMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()), new Properties(),
+ new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
_list = new ArrayList();
}
public StreamMessageImpl(final Header header,
+ final MessageAnnotations messageAnnotations,
final Properties properties,
final ApplicationProperties appProperties,
final List amqpListSection, final Footer footer)
{
- super(header, properties, appProperties, footer, null);
+ super(header, messageAnnotations, properties, appProperties, footer, null);
_list = amqpListSection;
}
@@ -257,6 +259,7 @@ public class StreamMessageImpl extends M
public Object readObject() throws JMSException
{
+ checkReadable();
if(_offset == -1)
{
return _list.get(++_position);
@@ -332,6 +335,7 @@ public class StreamMessageImpl extends M
public void reset() throws JMSException
{
+ super.reset();
_position = -1;
_offset = -1;
}
@@ -340,6 +344,10 @@ public class StreamMessageImpl extends M
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
sections.add(new AmqpValue(_list));
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryQueueImpl.java Sun Sep 4 21:08:40 2011
@@ -18,19 +18,36 @@
*/
package org.apache.qpid.amqp_1_0.jms.impl;
+import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.TemporaryQueue;
import javax.jms.JMSException;
public class TemporaryQueueImpl extends QueueImpl implements TemporaryQueue
{
- protected TemporaryQueueImpl(String address)
+ private Sender _sender;
+
+ protected TemporaryQueueImpl(String address, Sender sender)
{
super(address);
+ _sender = sender;
}
public void delete() throws JMSException
{
- //TODO
+ try
+ {
+ if(_sender != null)
+ {
+ _sender.close();
+ _sender = null;
+ }
+ }
+ catch (Sender.SenderClosingException e)
+ {
+ final JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java Sun Sep 4 21:08:40 2011
@@ -33,19 +33,21 @@ public class TextMessageImpl extends Mes
private String _text;
protected TextMessageImpl(Header header,
+ MessageAnnotations messageAnnotations,
Properties properties,
ApplicationProperties appProperties,
String text,
Footer footer,
SessionImpl session)
{
- super(header, properties, appProperties, footer, session);
+ super(header, messageAnnotations, properties, appProperties, footer, session);
_text = text;
}
protected TextMessageImpl(final SessionImpl session)
{
- super(new Header(), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
+ super(new Header(), new MessageAnnotations(new HashMap()),
+ new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP),
session);
}
@@ -68,6 +70,10 @@ public class TextMessageImpl extends Mes
{
List<Section> sections = new ArrayList<Section>();
sections.add(getHeader());
+ if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty())
+ {
+ sections.add(getMessageAnnotations());
+ }
sections.add(getProperties());
sections.add(getApplicationProperties());
AmqpValue section = new AmqpValue(_text);
@@ -75,4 +81,6 @@ public class TextMessageImpl extends Mes
sections.add(getFooter());
return sections;
}
+
+
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicImpl.java Sun Sep 4 21:08:40 2011
@@ -49,5 +49,8 @@ public class TopicImpl extends Destinati
return topic;
}
-
+ public static TopicImpl valueOf(String address)
+ {
+ return address == null ? null : createTopic(address);
+ }
}
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSessionImpl.java Sun Sep 4 21:08:40 2011
@@ -28,6 +28,7 @@ public class TopicSessionImpl extends Se
protected TopicSessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode)
{
super(connection, acknowledgeMode);
+ setTopicSession(true);
}
public TopicSubscriberImpl createSubscriber(final Topic topic) throws JMSException
Modified: qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java?rev=1165110&r1=1165109&r2=1165110&view=diff
==============================================================================
--- qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java (original)
+++ qpid/branches/rg-amqp-1-0-sandbox/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TopicSubscriberImpl.java Sun Sep 4 21:08:40 2011
@@ -22,7 +22,7 @@ import org.apache.qpid.amqp_1_0.client.R
import org.apache.qpid.amqp_1_0.jms.Topic;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
-import javax.jms.JMSException;
+import javax.jms.*;
public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber
{
@@ -33,6 +33,7 @@ public class TopicSubscriberImpl extends
throws JMSException
{
super(destination, session, selector, noLocal);
+ setTopicSubscriber(true);
}
public TopicImpl getTopic() throws JMSException
@@ -41,7 +42,7 @@ public class TopicSubscriberImpl extends
}
- protected Receiver createClientReceiver()
+ protected Receiver createClientReceiver() throws javax.jms.IllegalStateException
{
return getSession().getClientSession().createCopyingReceiver(getDestination().getAddress());
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org