You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:41 UTC

[17/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
new file mode 100644
index 0000000..1131829
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.facade.JmsMessageFacade;
+import org.apache.qpid.jms.meta.JmsMessageId;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ *
+ */
+public class AmqpJmsMessageFacade implements JmsMessageFacade {
+
+    private static final int DEFAULT_PRIORITY = javax.jms.Message.DEFAULT_PRIORITY;
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+    private static final long MAX_UINT = 0xFFFFFFFFL;
+
+    protected final Message message;
+    protected final AmqpConnection connection;
+
+    private MessageAnnotations annotations;
+    private Map<Symbol,Object> annotationsMap;
+    private Map<String,Object> propertiesMap;
+
+    private JmsDestination replyTo;
+    private JmsDestination destination;
+
+    private Long syntheticTTL;
+
+    /**
+     * Used to record the value of JMS_AMQP_TTL property
+     * if it is explicitly set by the application
+     */
+    private Long userSpecifiedTTL = null;
+
+    /**
+     * Create a new AMQP Message Facade with an empty message instance.
+     */
+    public AmqpJmsMessageFacade(AmqpConnection connection) {
+        this.message = Proton.message();
+        this.message.setDurable(true);
+
+        this.connection = connection;
+        setAnnotation(JMS_MSG_TYPE, JMS_MESSAGE);
+    }
+
+    /**
+     * Creates a new Facade around an incoming AMQP Message for dispatch to the
+     * JMS Consumer instance.
+     *
+     * @param connection
+     *        the connection that created this Facade.
+     * @param message
+     *        the incoming Message instance that is being wrapped.
+     */
+    @SuppressWarnings("unchecked")
+    public AmqpJmsMessageFacade(AmqpConnection connection, Message message) {
+        this.message = message;
+        this.connection = connection;
+
+        annotations = message.getMessageAnnotations();
+        if (annotations != null) {
+            annotationsMap = annotations.getValue();
+        }
+
+        if (message.getApplicationProperties() != null) {
+            propertiesMap = message.getApplicationProperties().getValue();
+        }
+
+        Long ttl = message.getTtl();
+        Long absoluteExpiryTime = getAbsoluteExpiryTime();
+        if (absoluteExpiryTime == null && ttl != null) {
+            syntheticTTL = System.currentTimeMillis() + ttl;
+        }
+
+        // TODO - Set destination
+        // TODO - Set replyTo
+    }
+
+    /**
+     * @return the appropriate byte value that indicates the type of message this is.
+     */
+    public byte getJmsMsgType() {
+        return JMS_MESSAGE;
+    }
+
+    /**
+     * The annotation value for the JMS Message content type.  For a generic JMS message this
+     * value is omitted so we return null here, subclasses should override this to return the
+     * correct content type value for their payload.
+     *
+     * @return a String value indicating the message content type.
+     */
+    public String getContentType() {
+        return message.getContentType();
+    }
+
+    public void setContentType(String value) {
+        message.setContentType(value);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return true;
+    }
+
+    @Override
+    public Map<String, Object> getProperties() throws JMSException {
+        lazyCreateProperties();
+        return Collections.unmodifiableMap(new HashMap<String, Object>(propertiesMap));
+    }
+
+    @Override
+    public boolean propertyExists(String key) throws JMSException {
+        return AmqpJmsMessagePropertyIntercepter.getProperty(this, key) != null;
+    }
+
+    public boolean applicationPropertyExists(String key) throws JMSException {
+        if (propertiesMap != null) {
+            return propertiesMap.containsKey(key);
+        }
+
+        return false;
+    }
+
+    /**
+     * Returns a set of all the property names that have been set in this message.
+     *
+     * @return a set of property names in the message or an empty set if none are set.
+     */
+    public Set<String> getPropertyNames() {
+        Set<String> properties = AmqpJmsMessagePropertyIntercepter.getPropertyNames(this);
+        if (propertiesMap != null) {
+            properties.addAll(propertiesMap.keySet());
+        }
+        return properties;
+    }
+
+    @Override
+    public Object getProperty(String key) throws JMSException {
+        return AmqpJmsMessagePropertyIntercepter.getProperty(this, key);
+    }
+
+    public Object getApplicationProperty(String key) throws JMSException {
+        if (propertiesMap != null) {
+            return propertiesMap.get(key);
+        }
+
+        return null;
+    }
+
+    @Override
+    public void setProperty(String key, Object value) throws JMSException {
+        if (key == null) {
+            throw new IllegalArgumentException("Property key must not be null");
+        }
+
+        AmqpJmsMessagePropertyIntercepter.setProperty(this, key, value);
+    }
+
+    public void setApplicationProperty(String key, Object value) throws JMSException {
+        if (propertiesMap == null) {
+            lazyCreateProperties();
+        }
+
+        propertiesMap.put(key, value);
+    }
+
+    @Override
+    public void onSend() throws JMSException {
+        String contentType = getContentType();
+        byte jmsMsgType = getJmsMsgType();
+
+        if (contentType != null) {
+            message.setContentType(contentType);
+        }
+        setAnnotation(JMS_MSG_TYPE, jmsMsgType);
+    }
+
+    @Override
+    public void clearBody() {
+        message.setBody(null);
+    }
+
+    @Override
+    public void clearProperties() {
+        clearProperties();
+        //_propJMS_AMQP_TTL = null;
+        message.setReplyToGroupId(null);
+        message.setUserId(null);
+        message.setGroupId(null);
+        setGroupSequence(0);
+
+        // TODO - Clear others as needed.
+    }
+
+    @Override
+    public JmsMessageFacade copy() throws JMSException {
+        AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection, message);
+        copyInto(copy);
+        return copy;
+    }
+
+    protected void copyInto(AmqpJmsMessageFacade target) {
+        // TODO - Copy message.
+    }
+
+    @Override
+    public JmsMessageId getMessageId() {
+        Object result = message.getMessageId();
+        if (result != null) {
+            if (result instanceof String) {
+                return new JmsMessageId((String) result);
+            } else {
+                // TODO
+                throw new RuntimeException("No support for non-String IDs yet.");
+            }
+        }
+
+        //TODO: returning a null JmsMessageId object leads to NPE during delivery processing
+        return null;
+    }
+
+    @Override
+    public void setMessageId(JmsMessageId messageId) {
+        if (messageId != null) {
+            message.setMessageId(messageId.toString());
+        } else {
+            message.setMessageId(null);
+        }
+    }
+
+    @Override
+    public long getTimestamp() {
+        if (message.getProperties() != null) {
+            Date timestamp = message.getProperties().getCreationTime();
+            if (timestamp != null) {
+                return timestamp.getTime();
+            }
+        }
+
+        return 0L;
+    }
+
+    @Override
+    public void setTimestamp(long timestamp) {
+        if (message.getProperties() != null) {
+            if (timestamp != 0) {
+                message.setCreationTime(timestamp);
+            } else {
+                message.getProperties().setCreationTime(null);
+            }
+        }
+    }
+
+    @Override
+    public String getCorrelationId() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setCorrelationId(String correlationId) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public byte[] getCorrelationIdBytes() throws JMSException {
+        Object correlationId = message.getCorrelationId();
+        if (correlationId == null) {
+            return null;
+        } else if (correlationId instanceof ByteBuffer) {
+            ByteBuffer dup = ((ByteBuffer) correlationId).duplicate();
+            byte[] bytes = new byte[dup.remaining()];
+            dup.get(bytes);
+            return bytes;
+        } else {
+            // TODO - Do we need to throw here, or could we just stringify whatever is in
+            //        there and return the UTF-8 bytes?  This method is pretty useless so
+            //        maybe we just return something and let the user sort if out if they
+            //        really think they need this.
+            throw new JMSException("The underlying correlation-id is not binary and so can't be returned");
+        }
+    }
+
+    @Override
+    public void setCorrelationIdBytes(byte[] correlationId) {
+        if (correlationId == null) {
+            message.setCorrelationId(correlationId);
+        } else {
+            byte[] bytes = Arrays.copyOf(correlationId, correlationId.length);
+            message.setCorrelationId(ByteBuffer.wrap(bytes));
+        }
+    }
+
+    @Override
+    public boolean isPersistent() {
+        return message.isDurable();
+    }
+
+    @Override
+    public void setPersistent(boolean value) {
+        this.message.setDurable(value);
+    }
+
+    @Override
+    public int getRedeliveryCounter() {
+        if (message.getHeader() != null) {
+            UnsignedInteger count = message.getHeader().getDeliveryCount();
+            if (count != null) {
+                return count.intValue();
+            }
+        }
+
+        return 0;
+    }
+
+    @Override
+    public void setRedeliveryCounter(int redeliveryCount) {
+        if (redeliveryCount == 0) {
+            if (message.getHeader() != null) {
+                message.getHeader().setDeliveryCount(null);
+            }
+        } else {
+            message.setDeliveryCount(redeliveryCount);
+        }
+    }
+
+    @Override
+    public boolean isRedelivered() {
+        return getRedeliveryCounter() > 0;
+    }
+
+    @Override
+    public void setRedelivered(boolean redelivered) {
+        if (redelivered) {
+            if (!isRedelivered()) {
+                setRedeliveryCounter(1);
+            }
+        } else {
+            if (isRedelivered()) {
+                setRedeliveryCounter(0);
+            }
+        }
+    }
+
+    @Override
+    public String getType() {
+        return (String) getAnnotation(JMS_MSG_TYPE);
+    }
+
+    @Override
+    public void setType(String type) {
+        setAnnotation(JMS_MSG_TYPE, type);
+    }
+
+    @Override
+    public byte getPriority() {
+        if (message.getHeader() != null) {
+            UnsignedByte priority = message.getHeader().getPriority();
+            if (priority != null) {
+                return priority.byteValue();
+            }
+        }
+
+        return DEFAULT_PRIORITY;
+    }
+
+    @Override
+    public void setPriority(byte priority) {
+        if (priority == DEFAULT_PRIORITY) {
+            if (message.getHeader() == null) {
+                return;
+            } else {
+                message.getHeader().setPriority(null);
+            }
+        } else {
+            message.setPriority(priority);
+        }
+    }
+
+    @Override
+    public long getExpiration() {
+        Long absoluteExpiry = getAbsoluteExpiryTime();
+        if (absoluteExpiry != null) {
+            return absoluteExpiry;
+        }
+
+        if (syntheticTTL != null) {
+            return syntheticTTL;
+        }
+
+        return 0;
+    }
+
+    @Override
+    public void setExpiration(long expiration) {
+        syntheticTTL = null;
+
+        if (expiration != 0) {
+            setAbsoluteExpiryTime(expiration);
+        } else {
+            setAbsoluteExpiryTime(null);
+        }
+    }
+
+    public void setAmqpTimeToLive(Object value) throws MessageFormatException {
+        Long ttl = null;
+        if (value instanceof Long) {
+            ttl = (Long) value;
+        }
+
+        if (ttl != null && ttl >= 0 && ttl <= MAX_UINT) {
+            userSpecifiedTTL = ttl;
+        } else {
+            throw new MessageFormatException(JMS_AMQP_TTL + " must be a long with value in range 0 to 2^31 - 1");
+        }
+    }
+
+    public long getAmqpTimeToLive() {
+        return userSpecifiedTTL;
+    }
+
+    @Override
+    public JmsDestination getDestination() {
+        return destination;
+    }
+
+    @Override
+    public void setDestination(JmsDestination destination) {
+        this.destination = destination;
+
+        // TODO
+    }
+
+    @Override
+    public JmsDestination getReplyTo() {
+        return replyTo;
+    }
+
+    @Override
+    public void setReplyTo(JmsDestination replyTo) {
+        this.replyTo = replyTo;
+        // TODO Auto-generated method stub
+    }
+
+    public void setReplyToGroupId(String replyToGroupId) {
+        message.setReplyToGroupId(replyToGroupId);
+    }
+
+    public String getReplyToGroupId() {
+        return message.getReplyToGroupId();
+    }
+
+    @Override
+    public String getUserId() {
+        String userId = null;
+        byte[] userIdBytes = message.getUserId();
+
+        if (userIdBytes != null) {
+            userId = new String(userIdBytes, UTF8);
+        }
+
+        return userId;
+    }
+
+    @Override
+    public void setUserId(String userId) {
+        message.setUserId(userId.getBytes(UTF8));
+    }
+
+    @Override
+    public String getGroupId() {
+        return message.getGroupId();
+    }
+
+    @Override
+    public void setGroupId(String groupId) {
+        message.setGroupId(groupId);
+    }
+
+    @Override
+    public int getGroupSequence() {
+        if (message.getProperties() != null) {
+            UnsignedInteger sequence = message.getProperties().getGroupSequence();
+            if (sequence != null) {
+                return sequence.intValue();
+            }
+        }
+
+        return 0;
+    }
+
+    @Override
+    public void setGroupSequence(int groupSequence) {
+        if (groupSequence < 0 && message.getProperties() != null) {
+            message.getProperties().setGroupSequence(null);
+        } else if (groupSequence > 0) {
+            message.setGroupSequence(groupSequence);
+        }
+    }
+
+    /**
+     * @return the true AMQP Message instance wrapped by this Facade.
+     */
+    public Message getAmqpMessage() {
+        return this.message;
+    }
+
+    /**
+     * The AmqpConnection instance that is associated with this Message.
+     * @return
+     */
+    public AmqpConnection getConnection() {
+        return connection;
+    }
+
+    /**
+     * Checks for the presence of a given message annotation and returns true
+     * if it is contained in the current annotations.  If the annotations have
+     * not yet been initialized then this method always returns false.
+     *
+     * @param key
+     *        the name of the annotation to query for.
+     *
+     * @return true if the annotation is present, false in not or annotations not initialized.
+     */
+    boolean annotationExists(String key) {
+        if (annotationsMap == null) {
+            return false;
+        }
+
+        return annotationsMap.containsKey(AmqpMessageSupport.getSymbol(key));
+    }
+
+    /**
+     * Given an annotation name, lookup and return the value associated with that
+     * annotation name.  If the message annotations have not been created yet then
+     * this method will always return null.
+     *
+     * @param key
+     *        the Symbol name that should be looked up in the message annotations.
+     *
+     * @return the value of the annotation if it exists, or null if not set or not accessible.
+     */
+    Object getAnnotation(String key) {
+        if (annotationsMap == null) {
+            return null;
+        }
+
+        return annotationsMap.get(AmqpMessageSupport.getSymbol(key));
+    }
+
+    /**
+     * Removes a message annotation if the message contains it.  Will not do
+     * a lazy create on the message annotations so caller cannot count on the
+     * existence of the message annotations after a call to this method.
+     *
+     * @param key
+     *        the annotation key that is to be removed from the current set.
+     */
+    void removeAnnotation(String key) {
+        if (annotationsMap == null) {
+            return;
+        }
+
+        annotationsMap.remove(AmqpMessageSupport.getSymbol(key));
+    }
+
+    /**
+     * Perform a proper annotation set on the AMQP Message based on a Symbol key and
+     * the target value to append to the current annotations.
+     *
+     * @param key
+     *        The name of the Symbol whose value is being set.
+     * @param value
+     *        The new value to set in the annotations of this message.
+     */
+    void setAnnotation(String key, Object value) {
+        lazyCreateAnnotations();
+        annotationsMap.put(AmqpMessageSupport.getSymbol(key), value);
+    }
+
+    /**
+     * Removes all message annotations from this message.
+     */
+    void clearAnnotations() {
+        annotationsMap = null;
+        annotations = null;
+        message.setMessageAnnotations(null);
+    }
+
+    /**
+     * Removes all application level properties from the Message.
+     */
+    void clearAllApplicationProperties() {
+        propertiesMap = null;
+        message.setApplicationProperties(null);
+    }
+
+    private Long getAbsoluteExpiryTime() {
+        Long result = null;
+        if (message.getProperties() != null) {
+            Date date = message.getProperties().getAbsoluteExpiryTime();
+            if (date != null) {
+                result = date.getTime();
+            }
+        }
+
+        return result;
+    }
+
+    private void setAbsoluteExpiryTime(Long expiration) {
+        if (expiration == null) {
+            if (message.getProperties() != null) {
+                message.getProperties().setAbsoluteExpiryTime(null);
+            }
+        } else {
+            message.setExpiryTime(expiration);
+        }
+    }
+
+    private void lazyCreateAnnotations() {
+        if (annotationsMap == null) {
+            annotationsMap = new HashMap<Symbol,Object>();
+            annotations = new MessageAnnotations(annotationsMap);
+            message.setMessageAnnotations(annotations);
+        }
+    }
+
+    private void lazyCreateProperties() {
+        propertiesMap = new HashMap<String,Object>();
+        message.setApplicationProperties(new ApplicationProperties(propertiesMap));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
new file mode 100644
index 0000000..882c2ac
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsBytesMessage;
+import org.apache.qpid.jms.message.JmsMapMessage;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsObjectMessage;
+import org.apache.qpid.jms.message.JmsStreamMessage;
+import org.apache.qpid.jms.message.JmsTextMessage;
+import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
+import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultBytesMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMapMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultObjectMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultStreamMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultTextMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+
+/**
+ * AMQP Message Factory instance used to create new JmsMessage types that wrap an
+ * Proton AMQP Message.  This class is used by the JMS layer to create its JMS
+ * Message instances, the messages returned here should be created in a proper
+ * initially empty state for the client to populate.
+ */
+public class AmqpJmsMessageFactory implements JmsMessageFactory {
+
+    private AmqpConnection connection;
+
+    public AmqpJmsMessageFactory() {
+    }
+
+    public AmqpJmsMessageFactory(AmqpConnection connection) {
+        this.connection = connection;
+    }
+
+    public AmqpConnection getAmqpConnection() {
+        return this.connection;
+    }
+
+    public void setAmqpConnection(AmqpConnection connection) {
+        this.connection = connection;
+    }
+
+    @Override
+    public JmsMessage createMessage() throws JMSException {
+        //return new JmsMessage(new AmqpJmsMessageFacade(connection));
+        return new JmsMessage(new JmsDefaultMessageFacade());
+    }
+
+    @Override
+    public JmsTextMessage createTextMessage() throws JMSException {
+        return createTextMessage(null);
+    }
+
+    @Override
+    public JmsTextMessage createTextMessage(String payload) throws JMSException {
+
+        // JmsTextMessageFacade facade = new AmqpJmsTextMessageFacade(connection);
+        JmsTextMessageFacade facade = new JmsDefaultTextMessageFacade();
+
+        if (payload != null) {
+            facade.setText(payload);
+        }
+
+        return new JmsTextMessage(facade);
+    }
+
+    @Override
+    public JmsBytesMessage createBytesMessage() throws JMSException {
+        // return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection));
+        return new JmsBytesMessage(new JmsDefaultBytesMessageFacade());
+    }
+
+    @Override
+    public JmsMapMessage createMapMessage() throws JMSException {
+        // return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection));
+        return new JmsMapMessage(new JmsDefaultMapMessageFacade());
+    }
+
+    @Override
+    public JmsStreamMessage createStreamMessage() throws JMSException {
+        // return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection));
+        return new JmsStreamMessage(new JmsDefaultStreamMessageFacade());
+    }
+
+    @Override
+    public JmsObjectMessage createObjectMessage() throws JMSException {
+        return createObjectMessage(null);
+    }
+
+    @Override
+    public JmsObjectMessage createObjectMessage(Serializable payload) throws JMSException {
+
+        // JmsObjectMessageFacade facade = new AmqpJmsSerializedObjectMessageFacade(connection);
+        JmsObjectMessageFacade facade = new JmsDefaultObjectMessageFacade();
+
+        if (payload != null) {
+            try {
+                facade.setObject(payload);
+            } catch (IOException e) {
+                throw JmsExceptionSupport.create(e);
+            }
+        }
+
+        return new JmsObjectMessage(facade);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java
new file mode 100644
index 0000000..11d10a3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TYPED_ENCODING;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.util.TypeConversionSupport;
+
+/**
+ * Utility class used to intercept calls to Message property sets and gets and map the
+ * correct AMQP fields to the property name being accessed.
+ */
+public class AmqpJmsMessagePropertyIntercepter {
+
+    private static final Map<String, PropertyIntercepter> PROPERTY_INTERCEPTERS = new HashMap<String, PropertyIntercepter>();
+
+    /**
+     * Interface for a Property intercepter object used to write JMS style
+     * properties that are part of the JMS Message object members or perform
+     * some needed conversion action before some named property is read or
+     * written.  If a property is not writable then the intercepter should
+     * throw an JMSException to indicate the error.
+     */
+    interface PropertyIntercepter {
+
+        /**
+         * Called when the names property is queried from an JMS Message object.
+         *
+         * @param message
+         *        The message being acted upon.
+         *
+         * @return the correct property value from the given Message.
+         *
+         * @throws JMSException if an error occurs while accessing the property
+         */
+        Object getProperty(AmqpJmsMessageFacade message) throws JMSException;
+
+        /**
+         * Called when the names property is assigned from an JMS Message object.
+         *
+         * @param message
+         *        The message instance being acted upon.
+         * @param value
+         *        The value to assign to the intercepted property.
+         *
+         * @throws JMSException if an error occurs writing the property.
+         */
+        void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException;
+
+        /**
+         * Indicates if the intercepted property has a value currently assigned.
+         *
+         * @param message
+         *        The message instance being acted upon.
+         *
+         * @return true if the intercepted property has a value assigned to it.
+         */
+        boolean propertyExists(AmqpJmsMessageFacade message);
+
+    }
+
+    static {
+        PROPERTY_INTERCEPTERS.put(JMS_AMQP_TTL, new PropertyIntercepter() {
+            @Override
+            public Object getProperty(AmqpJmsMessageFacade message) throws JMSException {
+                return message.getAmqpTimeToLive();
+            }
+
+            @Override
+            public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException {
+                Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+                if (rc == null) {
+                    throw new JMSException("Property " + JMS_AMQP_TTL + " cannot be set from a " + value.getClass().getName() + ".");
+                }
+                message.setAmqpTimeToLive(rc.longValue());
+            }
+
+            @Override
+            public boolean propertyExists(AmqpJmsMessageFacade message) {
+                return message.getAmqpTimeToLive() != 0;
+            }
+        });
+        PROPERTY_INTERCEPTERS.put(JMS_AMQP_REPLY_TO_GROUP_ID, new PropertyIntercepter() {
+            @Override
+            public Object getProperty(AmqpJmsMessageFacade message) throws JMSException {
+                return message.getReplyToGroupId();
+            }
+
+            @Override
+            public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException {
+                String rc = (String) TypeConversionSupport.convert(value, String.class);
+                if (rc == null) {
+                    throw new JMSException("Property " + JMS_AMQP_REPLY_TO_GROUP_ID + " cannot be set from a " + value.getClass().getName() + ".");
+                }
+                message.setReplyToGroupId(rc);
+            }
+
+            @Override
+            public boolean propertyExists(AmqpJmsMessageFacade message) {
+                return message.getReplyToGroupId() != null;
+            }
+        });
+        PROPERTY_INTERCEPTERS.put(JMS_AMQP_TYPED_ENCODING, new PropertyIntercepter() {
+            @Override
+            public Object getProperty(AmqpJmsMessageFacade message) throws JMSException {
+                if (message instanceof AmqpJmsObjectMessageFacade) {
+                    return ((AmqpJmsObjectMessageFacade) message).isAmqpTypedEncoding();
+                }
+
+                return false;
+            }
+
+            @Override
+            public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException {
+                Integer rc = (Integer) TypeConversionSupport.convert(value, Boolean.class);
+                if (rc == null) {
+                    throw new JMSException("Property " + JMS_AMQP_TYPED_ENCODING + " cannot be set from a " + value.getClass().getName() + ".");
+                }
+
+                // TODO - Finished Typed encoding work.
+                if (message instanceof AmqpJmsObjectMessageFacade) {
+                    // ((AmqpJmsSerializedObjectMessageFacade) message)
+                } else {
+                    throw new MessageFormatException(JMS_AMQP_TYPED_ENCODING + " is only applicable to ObjectMessage");
+                }
+            }
+
+            @Override
+            public boolean propertyExists(AmqpJmsMessageFacade message) {
+                if (message instanceof AmqpJmsObjectMessageFacade) {
+                    // TODO - See notes in AmqpObjectMessageFacade about whether this should
+                    //        always be exposed for ObjectMessage or only if it's currently
+                    //        the case that the message uses the AMQP typed encoding.
+                    return ((AmqpJmsObjectMessageFacade) message).isAmqpTypedEncoding();
+                }
+
+                return false;
+            }
+        });
+    }
+
+    /**
+     * Static get method that takes a property name and gets the value either via
+     * a registered property get object or through the AmqpJmsMessageFacade getProperty
+     * method.
+     *
+     * @param message
+     *        the AmqpJmsMessageFacade instance to read from
+     * @param name
+     *        the property name that is being requested.
+     *
+     * @return the correct value either mapped to an attribute of a Message or a message property.
+     *
+     * @throws JMSException if an error occurs while reading the defined property.
+     */
+    public static Object getProperty(AmqpJmsMessageFacade message, String name) throws JMSException {
+        Object value = null;
+
+        PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+        if (propertyExpression != null) {
+            value = propertyExpression.getProperty(message);
+        } else {
+            value = message.getApplicationProperty(name);
+        }
+
+        return value;
+    }
+
+    /**
+     * Static set method that takes a property name and sets the value either via
+     * a registered property set object or through the AmqpJmsMessageFacade setProperty
+     * method.
+     *
+     * @param message
+     *        the AmqpJmsMessageFacade instance to write to.
+     * @param name
+     *        the property name that is being written.
+     * @param value
+     *        the new value to assign for the named property.
+     *
+     * @throws JMSException if an error occurs while writing the defined property.
+     */
+    public static void setProperty(AmqpJmsMessageFacade message, String name, Object value) throws JMSException {
+        PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+        if (propertyExpression != null) {
+            propertyExpression.setProperty(message, value);
+        } else {
+            message.setApplicationProperty(name, value);
+        }
+    }
+
+    /**
+     * Static query method to determine if a specific property exists in the given message.
+     *
+     * @param message
+     *        the AmqpJmsMessageFacade instance to write to.
+     * @param name
+     *        the property name that is being checked.
+     *
+     * @throws JMSException if an error occurs while inspecting the defined property.
+     */
+    public static void propertyExists(AmqpJmsMessageFacade message, String name) throws JMSException {
+        PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+        if (propertyExpression != null) {
+            propertyExpression.propertyExists(message);
+        } else {
+            message.applicationPropertyExists(name);
+        }
+    }
+
+    /**
+     * For each of the currently configured message property intercepter instance a
+     * string key value is inserted into an Set and returned.
+     *
+     * @return a Set<String> containing the names of all intercepted properties.
+     */
+    public static Set<String> getAllPropertyNames() {
+        return PROPERTY_INTERCEPTERS.keySet();
+    }
+
+    /**
+     * For each of the currently configured message property intercepter instance a
+     * string key value is inserted into an Set and returned if the property has a
+     * value and is available for a read operation.
+     *
+     * @return a Set<String> containing the names of all intercepted properties with a value.
+     */
+    public static Set<String> getPropertyNames(AmqpJmsMessageFacade message) {
+        Set<String> names = new HashSet<String>();
+        for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) {
+            if (entry.getValue().propertyExists(message)) {
+                names.add(entry.getKey());
+            }
+        }
+        return names;
+    }
+
+    /**
+     * Allows for the additional PropertyIntercepter instances to be added to the global set.
+     *
+     * @param propertyName
+     *        The name of the Message property that will be intercepted.
+     * @param getter
+     *        The PropertyIntercepter instance that should be used for the named property.
+     */
+    public static void addPropertyIntercepter(String propertyName, PropertyIntercepter getter) {
+        PROPERTY_INTERCEPTERS.put(propertyName, getter);
+    }
+
+    /**
+     * Given a property name, remove the configured intercepter that has been assigned to
+     * intercept calls for that property value.
+     *
+     * @param propertyName
+     *        The name of the PropertyIntercepter to remove.
+     *
+     * @return true if a getter was removed from the global set.
+     */
+    public boolean removePropertyIntercepter(String propertyName) {
+        if (PROPERTY_INTERCEPTERS.remove(propertyName) != null) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private final String name;
+    private final PropertyIntercepter propertyExpression;
+
+    /**
+     * Creates an new property getter instance that is assigned to read the named value.
+     *
+     * @param name
+     *        the property value that this getter is assigned to lookup.
+     */
+    public AmqpJmsMessagePropertyIntercepter(String name) {
+        this.name = name;
+        this.propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+    }
+
+    /**
+     * Gets the correct property value from the JmsMessageFacade instance based on
+     * the predefined property mappings.
+     *
+     * @param message
+     *        the JmsMessageFacade whose property is being read.
+     *
+     * @return the correct value either mapped to an Message attribute of a Message property.
+     *
+     * @throws JMSException if an error occurs while reading the defined property.
+     */
+    public Object get(AmqpJmsMessageFacade message) throws JMSException {
+        if (propertyExpression != null) {
+            return propertyExpression.getProperty(message);
+        }
+
+        return message.getApplicationProperty(name);
+    }
+
+    /**
+     * Sets the correct property value from the AmqpJmsMessageFacade instance based on
+     * the predefined property mappings.
+     *
+     * @param message
+     *        the AmqpJmsMessageFacade whose property is being read.
+     * @param value
+     *        the value to be set on the intercepted AmqpJmsMessageFacade property.
+     *
+     * @throws JMSException if an error occurs while reading the defined property.
+     */
+    public void set(AmqpJmsMessageFacade message, Object value) throws JMSException {
+        if (propertyExpression != null) {
+            propertyExpression.setProperty(message, value);
+        } else {
+            message.setApplicationProperty(name, value);
+        }
+    }
+
+    /**
+     * @return the property name that is being intercepted for the AmqpJmsMessageFacade.
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * @see java.lang.Object#toString()
+     */
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    /**
+     * @see java.lang.Object#hashCode()
+     */
+    @Override
+    public int hashCode() {
+        return name.hashCode();
+    }
+
+    /**
+     * @see java.lang.Object#equals(java.lang.Object)
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !this.getClass().equals(o.getClass())) {
+            return false;
+        }
+        return name.equals(((AmqpJmsMessagePropertyIntercepter) o).name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
new file mode 100644
index 0000000..3696653
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_OBJECT_MESSAGE;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
+ * type.
+ */
+public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements JmsObjectMessageFacade {
+
+    private AmqpObjectTypeDelegate delegate;
+
+    /**
+     * @param connection
+     */
+    public AmqpJmsObjectMessageFacade(AmqpConnection connection) {
+        super(connection);
+        setAnnotation(JMS_MSG_TYPE, JMS_OBJECT_MESSAGE);
+
+        // TODO Implement Connection property to control default serialization type
+        initDelegate(false);
+    }
+
+    /**
+     * @param connection
+     * @param message
+     */
+    public AmqpJmsObjectMessageFacade(AmqpConnection connection, Message message) {
+        super(connection, message);
+
+        // TODO detect the content type and init the proper delegate.
+        initDelegate(false);
+    }
+
+    /**
+     * @return the appropriate byte value that indicates the type of message this is.
+     */
+    @Override
+    public byte getJmsMsgType() {
+        return JMS_OBJECT_MESSAGE;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        // TODO - If null body changes to empty AmqpValue this needs to also change.
+        return getAmqpMessage().getBody() == null;
+    }
+
+    public boolean isAmqpTypedEncoding() {
+        return this.delegate instanceof AmqpObjectTypeDelegate;
+    }
+
+    @Override
+    public JmsObjectMessageFacade copy() throws JMSException {
+        AmqpJmsObjectMessageFacade copy = new AmqpJmsObjectMessageFacade(connection);
+        copyInto(copy);
+
+        try {
+            copy.setObject(getObject());
+        } catch (Exception e) {
+            throw JmsExceptionSupport.create("Failed to copy object value", e);
+        }
+
+        return copy;
+    }
+
+    @Override
+    public Serializable getObject() throws IOException, ClassNotFoundException {
+        return delegate.getObject();
+    }
+
+    @Override
+    public void setObject(Serializable value) throws IOException {
+        delegate.setObject(value);
+    }
+
+    @Override
+    public void clearBody() {
+        try {
+            setObject(null);
+        } catch (IOException e) {
+        }
+    }
+
+    @Override
+    public void onSend() {
+        // TODO instruct delegate to encode the proper content type into the message.
+    }
+
+    private void initDelegate(boolean useAmqpTypes) {
+        if (!useAmqpTypes) {
+            delegate = new AmqpSerializedObjectDelegate(getAmqpMessage());
+        } else {
+            delegate = new AmqpTypedObjectDelegate(getAmqpMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
new file mode 100644
index 0000000..0999225
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_STREAM_MESSAGE;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.jms.MessageEOFException;
+
+import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS StreamMessage
+ * type.
+ */
+public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements JmsStreamMessageFacade {
+
+    private List<Object> list;
+    private int position = 0;
+
+    /**
+     * Create a new facade ready for sending.
+     *
+     * @param connection
+     *        the connection instance that created this facade.
+     */
+    public AmqpJmsStreamMessageFacade(AmqpConnection connection) {
+        super(connection);
+        initializeEmptyList();
+        setAnnotation(JMS_MSG_TYPE, JMS_STREAM_MESSAGE);
+    }
+
+    /**
+     * Creates a new Facade around an incoming AMQP Message for dispatch to the
+     * JMS Consumer instance.
+     *
+     * @param connection
+     *        the connection that created this Facade.
+     * @param message
+     *        the incoming Message instance that is being wrapped.
+     */
+    @SuppressWarnings("unchecked")
+    public AmqpJmsStreamMessageFacade(AmqpConnection connection, Message message) {
+        super(connection, message);
+
+        Section body = getAmqpMessage().getBody();
+        if (body == null) {
+            initializeEmptyList();
+        } else if (body instanceof AmqpValue) {
+            Object value = ((AmqpValue) body).getValue();
+
+            if (value == null) {
+                initializeEmptyList();
+            } else if (value instanceof List) {
+                list = (List<Object>) value;
+            } else {
+                throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+            }
+        } else {
+            throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public JmsStreamMessageFacade copy() {
+        AmqpJmsStreamMessageFacade copy = new AmqpJmsStreamMessageFacade(connection);
+        copyInto(copy);
+        copy.list.addAll(list);
+        return copy;
+    }
+
+    /**
+     * @return the appropriate byte value that indicates the type of message this is.
+     */
+    @Override
+    public byte getJmsMsgType() {
+        return JMS_STREAM_MESSAGE;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !list.isEmpty() && position < list.size();
+    }
+
+    @Override
+    public Object peek() throws MessageEOFException {
+        if (list.isEmpty() || position >= list.size()) {
+            throw new MessageEOFException("Attempt to read past end of stream");
+        }
+
+        Object object = list.get(position);
+        if (object instanceof Binary) {
+            // Copy to a byte[], ensure we copy only the required portion.
+            Binary bin = ((Binary) object);
+            object = Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+        }
+
+        return object;
+    }
+
+    @Override
+    public void pop() throws MessageEOFException {
+        if (list.isEmpty() || position >= list.size()) {
+            throw new MessageEOFException("Attempt to read past end of stream");
+        }
+
+        position++;
+    }
+
+    @Override
+    public void put(Object value) {
+        Object entry = value;
+        if (entry instanceof byte[]) {
+            entry = new Binary((byte[]) value);
+        }
+
+        list.add(entry);
+    }
+
+    @Override
+    public void reset() {
+        position = 0;
+    }
+
+    @Override
+    public void clearBody() {
+        list.clear();
+        position = 0;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return list.isEmpty();
+    }
+
+    private void initializeEmptyList() {
+        List<Object> list = new ArrayList<Object>();
+        message.setBody(new AmqpValue(list));
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
new file mode 100644
index 0000000..6c2421b
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS TextMessage
+ * type.
+ */
+public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements JmsTextMessageFacade {
+
+    private static final String UTF_8 = "UTF-8";
+
+    /**
+     * Content type, only to be used when message uses a data
+     * body section, and not when using an amqp-value body section
+     */
+    public static final String CONTENT_TYPE = "text/plain";
+
+    private final CharsetDecoder decoder =  Charset.forName(UTF_8).newDecoder();
+
+    /**
+     * Create a new AMQP Message facade ready for sending.
+     *
+     * @param connection
+     *        The AMQP Connection that created this message.
+     */
+    public AmqpJmsTextMessageFacade(AmqpConnection connection) {
+        super(connection);
+        setAnnotation(JMS_MSG_TYPE, JMS_TEXT_MESSAGE);
+        setText(null);
+    }
+
+    /**
+     * Creates a new Facade around an incoming AMQP Message for dispatch to the
+     * JMS Consumer instance.
+     *
+     * @param connection
+     *        the connection that created this Facade.
+     * @param message
+     *        the incoming Message instance that is being wrapped.
+     */
+    public AmqpJmsTextMessageFacade(AmqpConnection connection, Message message) {
+        super(connection, message);
+    }
+
+    /**
+     * @return the appropriate byte value that indicates the type of message this is.
+     */
+    @Override
+    public byte getJmsMsgType() {
+        return JMS_TEXT_MESSAGE;
+    }
+
+    @Override
+    public JmsTextMessageFacade copy() throws JMSException {
+        AmqpJmsTextMessageFacade copy = new AmqpJmsTextMessageFacade(connection);
+        copyInto(copy);
+        copy.setText(getText());
+        return copy;
+    }
+
+    @Override
+    public String getText() throws JMSException {
+        Section body = getAmqpMessage().getBody();
+
+        if (body == null) {
+            return null;
+        } else if (body instanceof Data) {
+            Data data = (Data) body;
+            if (data.getValue() == null || data.getValue().getLength() == 0) {
+                return "";
+            } else {
+                Binary b = data.getValue();
+                ByteBuffer buf = ByteBuffer.wrap(b.getArray(), b.getArrayOffset(), b.getLength());
+
+                try {
+                    CharBuffer chars = decoder.decode(buf);
+                    return String.valueOf(chars);
+                } catch (CharacterCodingException e) {
+                    throw JmsExceptionSupport.create("Cannot decode String in UFT-8", e);
+                }
+            }
+        } else if (body instanceof AmqpValue) {
+            Object value = ((AmqpValue) body).getValue();
+
+            if (value == null || value instanceof String) {
+                return (String) value;
+            } else {
+                throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+            }
+        } else {
+            throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+        }
+    }
+
+    @Override
+    public void setText(String value) {
+        AmqpValue body = new AmqpValue(value);
+        getAmqpMessage().setBody(body);
+    }
+
+    @Override
+    public void clearBody() {
+        setText(null);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        Section body = getAmqpMessage().getBody();
+
+        if (body == null) {
+            return true;
+        } else if (body instanceof Data) {
+            Data data = (Data) body;
+            if (data.getValue() == null || data.getValue().getLength() == 0) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java
new file mode 100644
index 0000000..0bda795
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java
@@ -0,0 +1,270 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.qpid.jms.exceptions.IdConversionException;
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id values between
+ * the AMQP types and the Strings values used by JMS.
+ *
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
+ * for interoperability with other AMQP clients, the following encoding can be used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value<br/>
+ *
+ * "AMQP_BINARY:&lt;hex representation of binary content&gt;"<br/>
+ * "AMQP_UUID:&lt;string representation of uuid&gt;"<br/>
+ * "AMQP_ULONG:&lt;string representation of ulong&gt;"<br/>
+ * "AMQP_STRING:&lt;string&gt;"<br/>
+ *
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
+ *
+ * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will be thrown.
+ *
+ */
+public class AmqpMessageIdHelper {
+    public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+    public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+    public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+    public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+    public static final String JMS_ID_PREFIX = "ID:";
+
+    private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length();
+    private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
+    private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
+    private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
+    private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
+    private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+    /**
+     * Checks whether the given string begins with "ID:" prefix used to denote a JMSMessageID
+     *
+     * @param string the string to check
+     * @return true if and only id the string begins with "ID:"
+     */
+    public boolean hasMessageIdPrefix(String string) {
+        if (string == null) {
+            return false;
+        }
+
+        return string.startsWith(JMS_ID_PREFIX);
+    }
+
+    /**
+     * Returns the suffix of the given string after removing the first "ID:" prefix (if present).
+     *
+     * @param string the string to process
+     * @return the suffix, or the original String if the "ID:" prefix is not present
+     */
+    public String stripMessageIdPrefix(String id) {
+        if (hasMessageIdPrefix(id)) {
+            return strip(id, JMS_ID_PREFIX_LENGTH);
+        } else {
+            return id;
+        }
+    }
+
+    private String strip(String id, int numChars) {
+        return id.substring(numChars);
+    }
+
+    /**
+     * Takes the provided amqp messageId style object, and convert it to a base string.
+     * Encodes type information as a prefix where necessary to convey or escape the type
+     * of the provided object.
+     *
+     * @param messageId the object to process
+     * @return the base string to be used in creating the actual JMS id.
+     */
+    public String toBaseMessageIdString(Object messageId) {
+        if (messageId == null) {
+            return null;
+        } else if (messageId instanceof String) {
+            String stringId = (String) messageId;
+
+            // If the given string has a type encoding prefix,
+            // we need to escape it as an encoded string (even if
+            // the existing encoding prefix was also for string)
+            if (hasTypeEncodingPrefix(stringId)) {
+                return AMQP_STRING_PREFIX + stringId;
+            } else {
+                return stringId;
+            }
+        } else if (messageId instanceof UUID) {
+            return AMQP_UUID_PREFIX + messageId.toString();
+        } else if (messageId instanceof BigInteger || messageId instanceof Long) {
+            return AMQP_ULONG_PREFIX + messageId.toString();
+        } else if (messageId instanceof ByteBuffer) {
+            ByteBuffer dup = ((ByteBuffer) messageId).duplicate();
+
+            byte[] bytes = new byte[dup.remaining()];
+            dup.get(bytes);
+
+            String hex = convertBinaryToHexString(bytes);
+
+            return AMQP_BINARY_PREFIX + hex;
+        } else {
+            throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
+        }
+    }
+
+    private boolean hasTypeEncodingPrefix(String stringId) {
+        return hasAmqpBinaryPrefix(stringId) ||
+                    hasAmqpUuidPrefix(stringId) ||
+                        hasAmqpUlongPrefix(stringId) ||
+                            hasAmqpStringPrefix(stringId);
+    }
+
+    private boolean hasAmqpStringPrefix(String stringId) {
+        return stringId.startsWith(AMQP_STRING_PREFIX);
+    }
+
+    private boolean hasAmqpUlongPrefix(String stringId) {
+        return stringId.startsWith(AMQP_ULONG_PREFIX);
+    }
+
+    private boolean hasAmqpUuidPrefix(String stringId) {
+        return stringId.startsWith(AMQP_UUID_PREFIX);
+    }
+
+    private boolean hasAmqpBinaryPrefix(String stringId) {
+        return stringId.startsWith(AMQP_BINARY_PREFIX);
+    }
+
+    /**
+     * Takes the provided base id string and return the appropriate amqp messageId style object.
+     * Converts the type based on any relevant encoding information found as a prefix.
+     *
+     * @param baseId the object to be converted
+     * @return the amqp messageId style object
+     * @throws IdConversionException if the provided baseId String indicates an encoded type but can't be converted to that type. 
+     */
+    public Object toIdObject(String baseId) throws IdConversionException {
+        if (baseId == null) {
+            return null;
+        }
+
+        try {
+            if (hasAmqpUuidPrefix(baseId)) {
+                String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
+                return UUID.fromString(uuidString);
+            } else if (hasAmqpUlongPrefix(baseId)) {
+                String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
+                return new BigInteger(longString);
+            } else if (hasAmqpStringPrefix(baseId)) {
+                return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
+            } else if (hasAmqpBinaryPrefix(baseId)) {
+                String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
+                byte[] bytes = convertHexStringToBinary(hexString);
+                return ByteBuffer.wrap(bytes);
+            } else {
+                // We have a string without any type prefix, transmit it as-is.
+                return baseId;
+            }
+        } catch (IllegalArgumentException e) {
+            throw new IdConversionException("Unable to convert ID value", e);
+        }
+    }
+
+    /**
+     * Convert the provided hex-string into a binary representation where each byte represents
+     * two characters of the hex string.
+     *
+     * The hex characters may be upper or lower case.
+     *
+     * @param hexString string to convert
+     * @return a byte array containing the binary representation
+     * @throws IllegalArgumentException if the provided String is a non-even length or contains non-hex characters
+     */
+    public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
+        int length = hexString.length();
+
+        // As each byte needs two characters in the hex encoding, the string must be an even length.
+        if (length % 2 != 0) {
+            throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
+        }
+
+        byte[] binary = new byte[length / 2];
+
+        for (int i = 0; i < length; i += 2) {
+            char highBitsChar = hexString.charAt(i);
+            char lowBitsChar = hexString.charAt(i + 1);
+
+            int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+            int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+            binary[i / 2] = (byte) (highBits + lowBits);
+        }
+
+        return binary;
+    }
+
+    private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
+        if (ch >= '0' && ch <= '9') {
+            // subtract '0' to get difference in position as an int
+            return ch - '0';
+        } else if (ch >= 'A' && ch <= 'F') {
+            // subtract 'A' to get difference in position as an int
+            // and then add 10 for the offset of 'A'
+            return ch - 'A' + 10;
+        } else if (ch >= 'a' && ch <= 'f') {
+            // subtract 'a' to get difference in position as an int
+            // and then add 10 for the offset of 'a'
+            return ch - 'a' + 10;
+        }
+
+        throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
+    }
+
+    /**
+     * Convert the provided binary into a hex-string representation where each character
+     * represents 4 bits of the provided binary, i.e each byte requires two characters.
+     *
+     * The returned hex characters are upper-case.
+     *
+     * @param bytes binary to convert
+     * @return a String containing a hex representation of the bytes
+     */
+    public String convertBinaryToHexString(byte[] bytes) {
+        // Each byte is represented as 2 chars
+        StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+        for (byte b : bytes) {
+            // The byte will be expanded to int before shifting, replicating the
+            // sign bit, so mask everything beyond the first 4 bits afterwards
+            int highBitsInt = (b >> 4) & 0xF;
+            // We only want the first 4 bits
+            int lowBitsInt = b & 0xF;
+
+            builder.append(HEX_CHARS[highBitsInt]);
+            builder.append(HEX_CHARS[lowBitsInt]);
+        }
+
+        return builder.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
new file mode 100644
index 0000000..a01d415
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.qpid.proton.amqp.Symbol;
+
+/**
+ * Support class containing constant values and static methods that are
+ * used to map to / from AMQP Message types being sent or received.
+ */
+public final class AmqpMessageSupport {
+
+    /**
+     * The Annotation name to store the destination name that the Message
+     * will be sent to.  The Message should also be tagged with the appropriate
+     * destination attribute to allow the receiver to determine the correct
+     * destination type.
+     */
+    public static final String AMQP_TO_ANNOTATION = "x-opt-to-type";
+
+    /**
+     * The Annotation name to store the destination name that the sender wants
+     * to receive replies on.  The Message should also be tagged with the
+     * appropriate destination attribute to allow the receiver to determine the
+     * correct destination type.
+     */
+    public static final String AMQP_REPLY_TO_ANNOTATION = "x-opt-reply-type";
+
+    /**
+     * Attribute used to mark a destination as temporary.
+     */
+    public static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+    /**
+     * Attribute used to mark a destination as being a Queue type.
+     */
+    public static final String QUEUE_ATTRIBUTES = "queue";
+
+    /**
+     * Attribute used to mark a destination as being a Topic type.
+     */
+    public static final String TOPIC_ATTRIBUTES = "topic";
+
+    /**
+     * Convenience value used to mark a destination as a Temporary Queue.
+     */
+    public static final String TEMP_QUEUE_ATTRIBUTES = TEMPORARY_ATTRIBUTE + "," + QUEUE_ATTRIBUTES;
+
+    /**
+     * Convenience value used to mark a destination as a Temporary Topic.
+     */
+    public static final String TEMP_TOPIC_ATTRIBUTES = TEMPORARY_ATTRIBUTE + "," + TOPIC_ATTRIBUTES;
+
+    /**
+     * Attribute used to mark the Application defined correlation Id that has been
+     * set for the message.
+     */
+    public static final String JMS_APP_CORRELATION_ID = "x-opt-app-correlation-id";
+
+    /**
+     * Attribute used to mark the JMSType value set on the message.
+     */
+    public static final String JMS_TYPE = "x-opt-jms-type";
+
+    /**
+     * Attribute used to mark the JMS Type that the message instance represents.
+     */
+    public static final String JMS_MSG_TYPE = "x-opt-jms-msg-type";
+
+    /**
+     * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message
+     * which has no body.
+     */
+    public static final byte JMS_MESSAGE = 0;
+
+    /**
+     * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS ObjectMessage
+     * which has an Object value serialized in its message body.
+     */
+    public static final byte JMS_OBJECT_MESSAGE = 1;
+
+    /**
+     * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS MapMessage
+     * which has an Map instance serialized in its message body.
+     */
+    public static final byte JMS_MAP_MESSAGE = 2;
+
+    /**
+     * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS BytesMessage
+     * which has a body that consists of raw bytes.
+     */
+    public static final byte JMS_BYTES_MESSAGE = 3;
+
+    /**
+     * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS StreamMessage
+     * which has a body that is a structured collection of primitives values.
+     */
+    public static final byte JMS_STREAM_MESSAGE = 4;
+
+    /**
+     * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS TextMessage
+     * which has a body that contains a UTF-8 encoded String.
+     */
+    public static final byte JMS_TEXT_MESSAGE = 5;
+
+    public static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
+    public static final String JMS_AMQP_REPLY_TO_GROUP_ID = "JMS_AMQP_REPLY_TO_GROUP_ID";
+    public static final String JMS_AMQP_TYPED_ENCODING = "JMS_AMQP_TYPED_ENCODING";
+
+    /**
+     * Lookup and return the correct Proton Symbol instance based on the given key.
+     *
+     * @param key
+     *        the String value name of the Symbol to locate.
+     *
+     * @return the Symbol value that matches the given key.
+     */
+    public static Symbol getSymbol(String key) {
+        return Symbol.valueOf(key);
+    }
+
+    /**
+     * Given a JMS Destination object return the correct message annotations that
+     * will identify the type of Destination the name represents, Queue. Topic, etc.
+     *
+     * @param destination
+     *        The JMS Destination to be examined.
+     *
+     * @return the correct message annotation values to describe the given Destination.
+     */
+    public static String destinationAttributes(Destination destination) {
+        if (destination instanceof Queue) {
+            if (destination instanceof TemporaryQueue) {
+                return TEMP_QUEUE_ATTRIBUTES;
+            } else {
+                return QUEUE_ATTRIBUTES;
+            }
+        }
+        if (destination instanceof Topic) {
+            if (destination instanceof TemporaryTopic) {
+                return TEMP_TOPIC_ATTRIBUTES;
+            } else {
+                return TOPIC_ATTRIBUTES;
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
new file mode 100644
index 0000000..cfa6237
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Interface for a Delegate object that handles storing and retrieving the Object
+ * value in an Object message.
+ */
+public interface AmqpObjectTypeDelegate {
+
+    /**
+     * Given a serializable instance, store the value into the AMQP message using
+     * the strategy implemented by this delegate.
+     *
+     * @param value
+     *        A serializable object instance to be stored in the message.
+     *
+     * @throws IOException if an error occurs during the store operation.
+     */
+    void setObject(Serializable value) throws IOException;
+
+    /**
+     * Read a Serialized object from the AMQP message using the strategy implemented
+     * by this delegate.
+     *
+     * @return an Object that has been read from the stored object data in the message.
+     *
+     * @throws IOException if an error occurs while reading the stored object.
+     * @throws ClassNotFoundException if no class can be found for the stored type.
+     */
+    Serializable getObject() throws IOException, ClassNotFoundException;
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
new file mode 100644
index 0000000..72db9dc
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
+ * type.
+ */
+public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate {
+
+    public static final String CONTENT_TYPE = "application/x-java-serialized-object";
+
+    private final Message message;
+
+    /**
+     * Create a new delegate that uses Java serialization to store the message content.
+     *
+     * @param message
+     *        the AMQP message instance where the object is to be stored / read.
+     */
+    public AmqpSerializedObjectDelegate(Message message) {
+        this.message = message;
+        this.message.setContentType(CONTENT_TYPE);
+    }
+
+    @Override
+    public Serializable getObject() throws IOException, ClassNotFoundException {
+        Binary bin = null;
+
+        Section body = message.getBody();
+        if (body == null) {
+            return null;
+        } else if (body instanceof Data) {
+            bin = ((Data) body).getValue();
+        } else {
+            throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName());
+        }
+
+        if (bin == null) {
+            return null;
+        } else {
+            Serializable serialized = null;
+
+            try (ByteArrayInputStream bais = new ByteArrayInputStream(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+                 ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(bais)) {
+
+                serialized = (Serializable) objIn.readObject();
+            }
+
+            return serialized;
+        }
+    }
+
+    @Override
+    public void setObject(Serializable value) throws IOException {
+        if(value == null) {
+            // TODO: verify whether not sending a body is ok,
+            //       send a serialized null instead if it isn't
+            message.setBody(null);
+        } else {
+            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                 ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+
+               oos.writeObject(value);
+               oos.flush();
+               oos.close();
+
+               byte[] bytes = baos.toByteArray();
+               message.setBody(new Data(new Binary(bytes)));
+            }
+        }
+
+        // TODO: ensure content type is [still] set?
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org