You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:41 UTC
[17/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
new file mode 100644
index 0000000..1131829
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -0,0 +1,674 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.message.facade.JmsMessageFacade;
+import org.apache.qpid.jms.meta.JmsMessageId;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ *
+ */
+public class AmqpJmsMessageFacade implements JmsMessageFacade {
+
+ private static final int DEFAULT_PRIORITY = javax.jms.Message.DEFAULT_PRIORITY;
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final long MAX_UINT = 0xFFFFFFFFL;
+
+ protected final Message message;
+ protected final AmqpConnection connection;
+
+ private MessageAnnotations annotations;
+ private Map<Symbol,Object> annotationsMap;
+ private Map<String,Object> propertiesMap;
+
+ private JmsDestination replyTo;
+ private JmsDestination destination;
+
+ private Long syntheticTTL;
+
+ /**
+ * Used to record the value of JMS_AMQP_TTL property
+ * if it is explicitly set by the application
+ */
+ private Long userSpecifiedTTL = null;
+
+ /**
+ * Create a new AMQP Message Facade with an empty message instance.
+ */
+ public AmqpJmsMessageFacade(AmqpConnection connection) {
+ this.message = Proton.message();
+ this.message.setDurable(true);
+
+ this.connection = connection;
+ setAnnotation(JMS_MSG_TYPE, JMS_MESSAGE);
+ }
+
+ /**
+ * Creates a new Facade around an incoming AMQP Message for dispatch to the
+ * JMS Consumer instance.
+ *
+ * @param connection
+ * the connection that created this Facade.
+ * @param message
+ * the incoming Message instance that is being wrapped.
+ */
+ @SuppressWarnings("unchecked")
+ public AmqpJmsMessageFacade(AmqpConnection connection, Message message) {
+ this.message = message;
+ this.connection = connection;
+
+ annotations = message.getMessageAnnotations();
+ if (annotations != null) {
+ annotationsMap = annotations.getValue();
+ }
+
+ if (message.getApplicationProperties() != null) {
+ propertiesMap = message.getApplicationProperties().getValue();
+ }
+
+ Long ttl = message.getTtl();
+ Long absoluteExpiryTime = getAbsoluteExpiryTime();
+ if (absoluteExpiryTime == null && ttl != null) {
+ syntheticTTL = System.currentTimeMillis() + ttl;
+ }
+
+ // TODO - Set destination
+ // TODO - Set replyTo
+ }
+
+ /**
+ * @return the appropriate byte value that indicates the type of message this is.
+ */
+ public byte getJmsMsgType() {
+ return JMS_MESSAGE;
+ }
+
+ /**
+ * The annotation value for the JMS Message content type. For a generic JMS message this
+ * value is omitted so we return null here, subclasses should override this to return the
+ * correct content type value for their payload.
+ *
+ * @return a String value indicating the message content type.
+ */
+ public String getContentType() {
+ return message.getContentType();
+ }
+
+ public void setContentType(String value) {
+ message.setContentType(value);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return true;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() throws JMSException {
+ lazyCreateProperties();
+ return Collections.unmodifiableMap(new HashMap<String, Object>(propertiesMap));
+ }
+
+ @Override
+ public boolean propertyExists(String key) throws JMSException {
+ return AmqpJmsMessagePropertyIntercepter.getProperty(this, key) != null;
+ }
+
+ public boolean applicationPropertyExists(String key) throws JMSException {
+ if (propertiesMap != null) {
+ return propertiesMap.containsKey(key);
+ }
+
+ return false;
+ }
+
+ /**
+ * Returns a set of all the property names that have been set in this message.
+ *
+ * @return a set of property names in the message or an empty set if none are set.
+ */
+ public Set<String> getPropertyNames() {
+ Set<String> properties = AmqpJmsMessagePropertyIntercepter.getPropertyNames(this);
+ if (propertiesMap != null) {
+ properties.addAll(propertiesMap.keySet());
+ }
+ return properties;
+ }
+
+ @Override
+ public Object getProperty(String key) throws JMSException {
+ return AmqpJmsMessagePropertyIntercepter.getProperty(this, key);
+ }
+
+ public Object getApplicationProperty(String key) throws JMSException {
+ if (propertiesMap != null) {
+ return propertiesMap.get(key);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void setProperty(String key, Object value) throws JMSException {
+ if (key == null) {
+ throw new IllegalArgumentException("Property key must not be null");
+ }
+
+ AmqpJmsMessagePropertyIntercepter.setProperty(this, key, value);
+ }
+
+ public void setApplicationProperty(String key, Object value) throws JMSException {
+ if (propertiesMap == null) {
+ lazyCreateProperties();
+ }
+
+ propertiesMap.put(key, value);
+ }
+
+ @Override
+ public void onSend() throws JMSException {
+ String contentType = getContentType();
+ byte jmsMsgType = getJmsMsgType();
+
+ if (contentType != null) {
+ message.setContentType(contentType);
+ }
+ setAnnotation(JMS_MSG_TYPE, jmsMsgType);
+ }
+
+ @Override
+ public void clearBody() {
+ message.setBody(null);
+ }
+
+ @Override
+ public void clearProperties() {
+ clearProperties();
+ //_propJMS_AMQP_TTL = null;
+ message.setReplyToGroupId(null);
+ message.setUserId(null);
+ message.setGroupId(null);
+ setGroupSequence(0);
+
+ // TODO - Clear others as needed.
+ }
+
+ @Override
+ public JmsMessageFacade copy() throws JMSException {
+ AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection, message);
+ copyInto(copy);
+ return copy;
+ }
+
+ protected void copyInto(AmqpJmsMessageFacade target) {
+ // TODO - Copy message.
+ }
+
+ @Override
+ public JmsMessageId getMessageId() {
+ Object result = message.getMessageId();
+ if (result != null) {
+ if (result instanceof String) {
+ return new JmsMessageId((String) result);
+ } else {
+ // TODO
+ throw new RuntimeException("No support for non-String IDs yet.");
+ }
+ }
+
+ //TODO: returning a null JmsMessageId object leads to NPE during delivery processing
+ return null;
+ }
+
+ @Override
+ public void setMessageId(JmsMessageId messageId) {
+ if (messageId != null) {
+ message.setMessageId(messageId.toString());
+ } else {
+ message.setMessageId(null);
+ }
+ }
+
+ @Override
+ public long getTimestamp() {
+ if (message.getProperties() != null) {
+ Date timestamp = message.getProperties().getCreationTime();
+ if (timestamp != null) {
+ return timestamp.getTime();
+ }
+ }
+
+ return 0L;
+ }
+
+ @Override
+ public void setTimestamp(long timestamp) {
+ if (message.getProperties() != null) {
+ if (timestamp != 0) {
+ message.setCreationTime(timestamp);
+ } else {
+ message.getProperties().setCreationTime(null);
+ }
+ }
+ }
+
+ @Override
+ public String getCorrelationId() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setCorrelationId(String correlationId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public byte[] getCorrelationIdBytes() throws JMSException {
+ Object correlationId = message.getCorrelationId();
+ if (correlationId == null) {
+ return null;
+ } else if (correlationId instanceof ByteBuffer) {
+ ByteBuffer dup = ((ByteBuffer) correlationId).duplicate();
+ byte[] bytes = new byte[dup.remaining()];
+ dup.get(bytes);
+ return bytes;
+ } else {
+ // TODO - Do we need to throw here, or could we just stringify whatever is in
+ // there and return the UTF-8 bytes? This method is pretty useless so
+ // maybe we just return something and let the user sort if out if they
+ // really think they need this.
+ throw new JMSException("The underlying correlation-id is not binary and so can't be returned");
+ }
+ }
+
+ @Override
+ public void setCorrelationIdBytes(byte[] correlationId) {
+ if (correlationId == null) {
+ message.setCorrelationId(correlationId);
+ } else {
+ byte[] bytes = Arrays.copyOf(correlationId, correlationId.length);
+ message.setCorrelationId(ByteBuffer.wrap(bytes));
+ }
+ }
+
+ @Override
+ public boolean isPersistent() {
+ return message.isDurable();
+ }
+
+ @Override
+ public void setPersistent(boolean value) {
+ this.message.setDurable(value);
+ }
+
+ @Override
+ public int getRedeliveryCounter() {
+ if (message.getHeader() != null) {
+ UnsignedInteger count = message.getHeader().getDeliveryCount();
+ if (count != null) {
+ return count.intValue();
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void setRedeliveryCounter(int redeliveryCount) {
+ if (redeliveryCount == 0) {
+ if (message.getHeader() != null) {
+ message.getHeader().setDeliveryCount(null);
+ }
+ } else {
+ message.setDeliveryCount(redeliveryCount);
+ }
+ }
+
+ @Override
+ public boolean isRedelivered() {
+ return getRedeliveryCounter() > 0;
+ }
+
+ @Override
+ public void setRedelivered(boolean redelivered) {
+ if (redelivered) {
+ if (!isRedelivered()) {
+ setRedeliveryCounter(1);
+ }
+ } else {
+ if (isRedelivered()) {
+ setRedeliveryCounter(0);
+ }
+ }
+ }
+
+ @Override
+ public String getType() {
+ return (String) getAnnotation(JMS_MSG_TYPE);
+ }
+
+ @Override
+ public void setType(String type) {
+ setAnnotation(JMS_MSG_TYPE, type);
+ }
+
+ @Override
+ public byte getPriority() {
+ if (message.getHeader() != null) {
+ UnsignedByte priority = message.getHeader().getPriority();
+ if (priority != null) {
+ return priority.byteValue();
+ }
+ }
+
+ return DEFAULT_PRIORITY;
+ }
+
+ @Override
+ public void setPriority(byte priority) {
+ if (priority == DEFAULT_PRIORITY) {
+ if (message.getHeader() == null) {
+ return;
+ } else {
+ message.getHeader().setPriority(null);
+ }
+ } else {
+ message.setPriority(priority);
+ }
+ }
+
+ @Override
+ public long getExpiration() {
+ Long absoluteExpiry = getAbsoluteExpiryTime();
+ if (absoluteExpiry != null) {
+ return absoluteExpiry;
+ }
+
+ if (syntheticTTL != null) {
+ return syntheticTTL;
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void setExpiration(long expiration) {
+ syntheticTTL = null;
+
+ if (expiration != 0) {
+ setAbsoluteExpiryTime(expiration);
+ } else {
+ setAbsoluteExpiryTime(null);
+ }
+ }
+
+ public void setAmqpTimeToLive(Object value) throws MessageFormatException {
+ Long ttl = null;
+ if (value instanceof Long) {
+ ttl = (Long) value;
+ }
+
+ if (ttl != null && ttl >= 0 && ttl <= MAX_UINT) {
+ userSpecifiedTTL = ttl;
+ } else {
+ throw new MessageFormatException(JMS_AMQP_TTL + " must be a long with value in range 0 to 2^31 - 1");
+ }
+ }
+
+ public long getAmqpTimeToLive() {
+ return userSpecifiedTTL;
+ }
+
+ @Override
+ public JmsDestination getDestination() {
+ return destination;
+ }
+
+ @Override
+ public void setDestination(JmsDestination destination) {
+ this.destination = destination;
+
+ // TODO
+ }
+
+ @Override
+ public JmsDestination getReplyTo() {
+ return replyTo;
+ }
+
+ @Override
+ public void setReplyTo(JmsDestination replyTo) {
+ this.replyTo = replyTo;
+ // TODO Auto-generated method stub
+ }
+
+ public void setReplyToGroupId(String replyToGroupId) {
+ message.setReplyToGroupId(replyToGroupId);
+ }
+
+ public String getReplyToGroupId() {
+ return message.getReplyToGroupId();
+ }
+
+ @Override
+ public String getUserId() {
+ String userId = null;
+ byte[] userIdBytes = message.getUserId();
+
+ if (userIdBytes != null) {
+ userId = new String(userIdBytes, UTF8);
+ }
+
+ return userId;
+ }
+
+ @Override
+ public void setUserId(String userId) {
+ message.setUserId(userId.getBytes(UTF8));
+ }
+
+ @Override
+ public String getGroupId() {
+ return message.getGroupId();
+ }
+
+ @Override
+ public void setGroupId(String groupId) {
+ message.setGroupId(groupId);
+ }
+
+ @Override
+ public int getGroupSequence() {
+ if (message.getProperties() != null) {
+ UnsignedInteger sequence = message.getProperties().getGroupSequence();
+ if (sequence != null) {
+ return sequence.intValue();
+ }
+ }
+
+ return 0;
+ }
+
+ @Override
+ public void setGroupSequence(int groupSequence) {
+ if (groupSequence < 0 && message.getProperties() != null) {
+ message.getProperties().setGroupSequence(null);
+ } else if (groupSequence > 0) {
+ message.setGroupSequence(groupSequence);
+ }
+ }
+
+ /**
+ * @return the true AMQP Message instance wrapped by this Facade.
+ */
+ public Message getAmqpMessage() {
+ return this.message;
+ }
+
+ /**
+ * The AmqpConnection instance that is associated with this Message.
+ * @return
+ */
+ public AmqpConnection getConnection() {
+ return connection;
+ }
+
+ /**
+ * Checks for the presence of a given message annotation and returns true
+ * if it is contained in the current annotations. If the annotations have
+ * not yet been initialized then this method always returns false.
+ *
+ * @param key
+ * the name of the annotation to query for.
+ *
+ * @return true if the annotation is present, false in not or annotations not initialized.
+ */
+ boolean annotationExists(String key) {
+ if (annotationsMap == null) {
+ return false;
+ }
+
+ return annotationsMap.containsKey(AmqpMessageSupport.getSymbol(key));
+ }
+
+ /**
+ * Given an annotation name, lookup and return the value associated with that
+ * annotation name. If the message annotations have not been created yet then
+ * this method will always return null.
+ *
+ * @param key
+ * the Symbol name that should be looked up in the message annotations.
+ *
+ * @return the value of the annotation if it exists, or null if not set or not accessible.
+ */
+ Object getAnnotation(String key) {
+ if (annotationsMap == null) {
+ return null;
+ }
+
+ return annotationsMap.get(AmqpMessageSupport.getSymbol(key));
+ }
+
+ /**
+ * Removes a message annotation if the message contains it. Will not do
+ * a lazy create on the message annotations so caller cannot count on the
+ * existence of the message annotations after a call to this method.
+ *
+ * @param key
+ * the annotation key that is to be removed from the current set.
+ */
+ void removeAnnotation(String key) {
+ if (annotationsMap == null) {
+ return;
+ }
+
+ annotationsMap.remove(AmqpMessageSupport.getSymbol(key));
+ }
+
+ /**
+ * Perform a proper annotation set on the AMQP Message based on a Symbol key and
+ * the target value to append to the current annotations.
+ *
+ * @param key
+ * The name of the Symbol whose value is being set.
+ * @param value
+ * The new value to set in the annotations of this message.
+ */
+ void setAnnotation(String key, Object value) {
+ lazyCreateAnnotations();
+ annotationsMap.put(AmqpMessageSupport.getSymbol(key), value);
+ }
+
+ /**
+ * Removes all message annotations from this message.
+ */
+ void clearAnnotations() {
+ annotationsMap = null;
+ annotations = null;
+ message.setMessageAnnotations(null);
+ }
+
+ /**
+ * Removes all application level properties from the Message.
+ */
+ void clearAllApplicationProperties() {
+ propertiesMap = null;
+ message.setApplicationProperties(null);
+ }
+
+ private Long getAbsoluteExpiryTime() {
+ Long result = null;
+ if (message.getProperties() != null) {
+ Date date = message.getProperties().getAbsoluteExpiryTime();
+ if (date != null) {
+ result = date.getTime();
+ }
+ }
+
+ return result;
+ }
+
+ private void setAbsoluteExpiryTime(Long expiration) {
+ if (expiration == null) {
+ if (message.getProperties() != null) {
+ message.getProperties().setAbsoluteExpiryTime(null);
+ }
+ } else {
+ message.setExpiryTime(expiration);
+ }
+ }
+
+ private void lazyCreateAnnotations() {
+ if (annotationsMap == null) {
+ annotationsMap = new HashMap<Symbol,Object>();
+ annotations = new MessageAnnotations(annotationsMap);
+ message.setMessageAnnotations(annotations);
+ }
+ }
+
+ private void lazyCreateProperties() {
+ propertiesMap = new HashMap<String,Object>();
+ message.setApplicationProperties(new ApplicationProperties(propertiesMap));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
new file mode 100644
index 0000000..882c2ac
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFactory.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsBytesMessage;
+import org.apache.qpid.jms.message.JmsMapMessage;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsObjectMessage;
+import org.apache.qpid.jms.message.JmsStreamMessage;
+import org.apache.qpid.jms.message.JmsTextMessage;
+import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
+import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultBytesMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMapMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultObjectMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultStreamMessageFacade;
+import org.apache.qpid.jms.message.facade.defaults.JmsDefaultTextMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+
+/**
+ * AMQP Message Factory instance used to create new JmsMessage types that wrap an
+ * Proton AMQP Message. This class is used by the JMS layer to create its JMS
+ * Message instances, the messages returned here should be created in a proper
+ * initially empty state for the client to populate.
+ */
+public class AmqpJmsMessageFactory implements JmsMessageFactory {
+
+ private AmqpConnection connection;
+
+ public AmqpJmsMessageFactory() {
+ }
+
+ public AmqpJmsMessageFactory(AmqpConnection connection) {
+ this.connection = connection;
+ }
+
+ public AmqpConnection getAmqpConnection() {
+ return this.connection;
+ }
+
+ public void setAmqpConnection(AmqpConnection connection) {
+ this.connection = connection;
+ }
+
+ @Override
+ public JmsMessage createMessage() throws JMSException {
+ //return new JmsMessage(new AmqpJmsMessageFacade(connection));
+ return new JmsMessage(new JmsDefaultMessageFacade());
+ }
+
+ @Override
+ public JmsTextMessage createTextMessage() throws JMSException {
+ return createTextMessage(null);
+ }
+
+ @Override
+ public JmsTextMessage createTextMessage(String payload) throws JMSException {
+
+ // JmsTextMessageFacade facade = new AmqpJmsTextMessageFacade(connection);
+ JmsTextMessageFacade facade = new JmsDefaultTextMessageFacade();
+
+ if (payload != null) {
+ facade.setText(payload);
+ }
+
+ return new JmsTextMessage(facade);
+ }
+
+ @Override
+ public JmsBytesMessage createBytesMessage() throws JMSException {
+ // return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection));
+ return new JmsBytesMessage(new JmsDefaultBytesMessageFacade());
+ }
+
+ @Override
+ public JmsMapMessage createMapMessage() throws JMSException {
+ // return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection));
+ return new JmsMapMessage(new JmsDefaultMapMessageFacade());
+ }
+
+ @Override
+ public JmsStreamMessage createStreamMessage() throws JMSException {
+ // return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection));
+ return new JmsStreamMessage(new JmsDefaultStreamMessageFacade());
+ }
+
+ @Override
+ public JmsObjectMessage createObjectMessage() throws JMSException {
+ return createObjectMessage(null);
+ }
+
+ @Override
+ public JmsObjectMessage createObjectMessage(Serializable payload) throws JMSException {
+
+ // JmsObjectMessageFacade facade = new AmqpJmsSerializedObjectMessageFacade(connection);
+ JmsObjectMessageFacade facade = new JmsDefaultObjectMessageFacade();
+
+ if (payload != null) {
+ try {
+ facade.setObject(payload);
+ } catch (IOException e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ return new JmsObjectMessage(facade);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java
new file mode 100644
index 0000000..11d10a3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessagePropertyIntercepter.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_REPLY_TO_GROUP_ID;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TYPED_ENCODING;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.util.TypeConversionSupport;
+
+/**
+ * Utility class used to intercept calls to Message property sets and gets and map the
+ * correct AMQP fields to the property name being accessed.
+ */
+public class AmqpJmsMessagePropertyIntercepter {
+
+ private static final Map<String, PropertyIntercepter> PROPERTY_INTERCEPTERS = new HashMap<String, PropertyIntercepter>();
+
+ /**
+ * Interface for a Property intercepter object used to write JMS style
+ * properties that are part of the JMS Message object members or perform
+ * some needed conversion action before some named property is read or
+ * written. If a property is not writable then the intercepter should
+ * throw an JMSException to indicate the error.
+ */
+ interface PropertyIntercepter {
+
+ /**
+ * Called when the names property is queried from an JMS Message object.
+ *
+ * @param message
+ * The message being acted upon.
+ *
+ * @return the correct property value from the given Message.
+ *
+ * @throws JMSException if an error occurs while accessing the property
+ */
+ Object getProperty(AmqpJmsMessageFacade message) throws JMSException;
+
+ /**
+ * Called when the names property is assigned from an JMS Message object.
+ *
+ * @param message
+ * The message instance being acted upon.
+ * @param value
+ * The value to assign to the intercepted property.
+ *
+ * @throws JMSException if an error occurs writing the property.
+ */
+ void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException;
+
+ /**
+ * Indicates if the intercepted property has a value currently assigned.
+ *
+ * @param message
+ * The message instance being acted upon.
+ *
+ * @return true if the intercepted property has a value assigned to it.
+ */
+ boolean propertyExists(AmqpJmsMessageFacade message);
+
+ }
+
+ static {
+ PROPERTY_INTERCEPTERS.put(JMS_AMQP_TTL, new PropertyIntercepter() {
+ @Override
+ public Object getProperty(AmqpJmsMessageFacade message) throws JMSException {
+ return message.getAmqpTimeToLive();
+ }
+
+ @Override
+ public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException {
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new JMSException("Property " + JMS_AMQP_TTL + " cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setAmqpTimeToLive(rc.longValue());
+ }
+
+ @Override
+ public boolean propertyExists(AmqpJmsMessageFacade message) {
+ return message.getAmqpTimeToLive() != 0;
+ }
+ });
+ PROPERTY_INTERCEPTERS.put(JMS_AMQP_REPLY_TO_GROUP_ID, new PropertyIntercepter() {
+ @Override
+ public Object getProperty(AmqpJmsMessageFacade message) throws JMSException {
+ return message.getReplyToGroupId();
+ }
+
+ @Override
+ public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException {
+ String rc = (String) TypeConversionSupport.convert(value, String.class);
+ if (rc == null) {
+ throw new JMSException("Property " + JMS_AMQP_REPLY_TO_GROUP_ID + " cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.setReplyToGroupId(rc);
+ }
+
+ @Override
+ public boolean propertyExists(AmqpJmsMessageFacade message) {
+ return message.getReplyToGroupId() != null;
+ }
+ });
+ PROPERTY_INTERCEPTERS.put(JMS_AMQP_TYPED_ENCODING, new PropertyIntercepter() {
+ @Override
+ public Object getProperty(AmqpJmsMessageFacade message) throws JMSException {
+ if (message instanceof AmqpJmsObjectMessageFacade) {
+ return ((AmqpJmsObjectMessageFacade) message).isAmqpTypedEncoding();
+ }
+
+ return false;
+ }
+
+ @Override
+ public void setProperty(AmqpJmsMessageFacade message, Object value) throws JMSException {
+ Integer rc = (Integer) TypeConversionSupport.convert(value, Boolean.class);
+ if (rc == null) {
+ throw new JMSException("Property " + JMS_AMQP_TYPED_ENCODING + " cannot be set from a " + value.getClass().getName() + ".");
+ }
+
+ // TODO - Finished Typed encoding work.
+ if (message instanceof AmqpJmsObjectMessageFacade) {
+ // ((AmqpJmsSerializedObjectMessageFacade) message)
+ } else {
+ throw new MessageFormatException(JMS_AMQP_TYPED_ENCODING + " is only applicable to ObjectMessage");
+ }
+ }
+
+ @Override
+ public boolean propertyExists(AmqpJmsMessageFacade message) {
+ if (message instanceof AmqpJmsObjectMessageFacade) {
+ // TODO - See notes in AmqpObjectMessageFacade about whether this should
+ // always be exposed for ObjectMessage or only if it's currently
+ // the case that the message uses the AMQP typed encoding.
+ return ((AmqpJmsObjectMessageFacade) message).isAmqpTypedEncoding();
+ }
+
+ return false;
+ }
+ });
+ }
+
+ /**
+ * Static get method that takes a property name and gets the value either via
+ * a registered property get object or through the AmqpJmsMessageFacade getProperty
+ * method.
+ *
+ * @param message
+ * the AmqpJmsMessageFacade instance to read from
+ * @param name
+ * the property name that is being requested.
+ *
+ * @return the correct value either mapped to an attribute of a Message or a message property.
+ *
+ * @throws JMSException if an error occurs while reading the defined property.
+ */
+ public static Object getProperty(AmqpJmsMessageFacade message, String name) throws JMSException {
+ Object value = null;
+
+ PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+ if (propertyExpression != null) {
+ value = propertyExpression.getProperty(message);
+ } else {
+ value = message.getApplicationProperty(name);
+ }
+
+ return value;
+ }
+
+ /**
+ * Static set method that takes a property name and sets the value either via
+ * a registered property set object or through the AmqpJmsMessageFacade setProperty
+ * method.
+ *
+ * @param message
+ * the AmqpJmsMessageFacade instance to write to.
+ * @param name
+ * the property name that is being written.
+ * @param value
+ * the new value to assign for the named property.
+ *
+ * @throws JMSException if an error occurs while writing the defined property.
+ */
+ public static void setProperty(AmqpJmsMessageFacade message, String name, Object value) throws JMSException {
+ PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+ if (propertyExpression != null) {
+ propertyExpression.setProperty(message, value);
+ } else {
+ message.setApplicationProperty(name, value);
+ }
+ }
+
+ /**
+ * Static query method to determine if a specific property exists in the given message.
+ *
+ * @param message
+ * the AmqpJmsMessageFacade instance to write to.
+ * @param name
+ * the property name that is being checked.
+ *
+ * @throws JMSException if an error occurs while inspecting the defined property.
+ */
+ public static void propertyExists(AmqpJmsMessageFacade message, String name) throws JMSException {
+ PropertyIntercepter propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+ if (propertyExpression != null) {
+ propertyExpression.propertyExists(message);
+ } else {
+ message.applicationPropertyExists(name);
+ }
+ }
+
+ /**
+ * For each of the currently configured message property intercepter instance a
+ * string key value is inserted into an Set and returned.
+ *
+ * @return a Set<String> containing the names of all intercepted properties.
+ */
+ public static Set<String> getAllPropertyNames() {
+ return PROPERTY_INTERCEPTERS.keySet();
+ }
+
+ /**
+ * For each of the currently configured message property intercepter instance a
+ * string key value is inserted into an Set and returned if the property has a
+ * value and is available for a read operation.
+ *
+ * @return a Set<String> containing the names of all intercepted properties with a value.
+ */
+ public static Set<String> getPropertyNames(AmqpJmsMessageFacade message) {
+ Set<String> names = new HashSet<String>();
+ for (Entry<String, PropertyIntercepter> entry : PROPERTY_INTERCEPTERS.entrySet()) {
+ if (entry.getValue().propertyExists(message)) {
+ names.add(entry.getKey());
+ }
+ }
+ return names;
+ }
+
+ /**
+ * Allows for the additional PropertyIntercepter instances to be added to the global set.
+ *
+ * @param propertyName
+ * The name of the Message property that will be intercepted.
+ * @param getter
+ * The PropertyIntercepter instance that should be used for the named property.
+ */
+ public static void addPropertyIntercepter(String propertyName, PropertyIntercepter getter) {
+ PROPERTY_INTERCEPTERS.put(propertyName, getter);
+ }
+
+ /**
+ * Given a property name, remove the configured intercepter that has been assigned to
+ * intercept calls for that property value.
+ *
+ * @param propertyName
+ * The name of the PropertyIntercepter to remove.
+ *
+ * @return true if a getter was removed from the global set.
+ */
+ public boolean removePropertyIntercepter(String propertyName) {
+ if (PROPERTY_INTERCEPTERS.remove(propertyName) != null) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private final String name;
+ private final PropertyIntercepter propertyExpression;
+
+ /**
+ * Creates an new property getter instance that is assigned to read the named value.
+ *
+ * @param name
+ * the property value that this getter is assigned to lookup.
+ */
+ public AmqpJmsMessagePropertyIntercepter(String name) {
+ this.name = name;
+ this.propertyExpression = PROPERTY_INTERCEPTERS.get(name);
+ }
+
+ /**
+ * Gets the correct property value from the JmsMessageFacade instance based on
+ * the predefined property mappings.
+ *
+ * @param message
+ * the JmsMessageFacade whose property is being read.
+ *
+ * @return the correct value either mapped to an Message attribute of a Message property.
+ *
+ * @throws JMSException if an error occurs while reading the defined property.
+ */
+ public Object get(AmqpJmsMessageFacade message) throws JMSException {
+ if (propertyExpression != null) {
+ return propertyExpression.getProperty(message);
+ }
+
+ return message.getApplicationProperty(name);
+ }
+
+ /**
+ * Sets the correct property value from the AmqpJmsMessageFacade instance based on
+ * the predefined property mappings.
+ *
+ * @param message
+ * the AmqpJmsMessageFacade whose property is being read.
+ * @param value
+ * the value to be set on the intercepted AmqpJmsMessageFacade property.
+ *
+ * @throws JMSException if an error occurs while reading the defined property.
+ */
+ public void set(AmqpJmsMessageFacade message, Object value) throws JMSException {
+ if (propertyExpression != null) {
+ propertyExpression.setProperty(message, value);
+ } else {
+ message.setApplicationProperty(name, value);
+ }
+ }
+
+ /**
+ * @return the property name that is being intercepted for the AmqpJmsMessageFacade.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ /**
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return name.hashCode();
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || !this.getClass().equals(o.getClass())) {
+ return false;
+ }
+ return name.equals(((AmqpJmsMessagePropertyIntercepter) o).name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
new file mode 100644
index 0000000..3696653
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_OBJECT_MESSAGE;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
+ * type.
+ */
+public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements JmsObjectMessageFacade {
+
+ private AmqpObjectTypeDelegate delegate;
+
+ /**
+ * @param connection
+ */
+ public AmqpJmsObjectMessageFacade(AmqpConnection connection) {
+ super(connection);
+ setAnnotation(JMS_MSG_TYPE, JMS_OBJECT_MESSAGE);
+
+ // TODO Implement Connection property to control default serialization type
+ initDelegate(false);
+ }
+
+ /**
+ * @param connection
+ * @param message
+ */
+ public AmqpJmsObjectMessageFacade(AmqpConnection connection, Message message) {
+ super(connection, message);
+
+ // TODO detect the content type and init the proper delegate.
+ initDelegate(false);
+ }
+
+ /**
+ * @return the appropriate byte value that indicates the type of message this is.
+ */
+ @Override
+ public byte getJmsMsgType() {
+ return JMS_OBJECT_MESSAGE;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ // TODO - If null body changes to empty AmqpValue this needs to also change.
+ return getAmqpMessage().getBody() == null;
+ }
+
+ public boolean isAmqpTypedEncoding() {
+ return this.delegate instanceof AmqpObjectTypeDelegate;
+ }
+
+ @Override
+ public JmsObjectMessageFacade copy() throws JMSException {
+ AmqpJmsObjectMessageFacade copy = new AmqpJmsObjectMessageFacade(connection);
+ copyInto(copy);
+
+ try {
+ copy.setObject(getObject());
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create("Failed to copy object value", e);
+ }
+
+ return copy;
+ }
+
+ @Override
+ public Serializable getObject() throws IOException, ClassNotFoundException {
+ return delegate.getObject();
+ }
+
+ @Override
+ public void setObject(Serializable value) throws IOException {
+ delegate.setObject(value);
+ }
+
+ @Override
+ public void clearBody() {
+ try {
+ setObject(null);
+ } catch (IOException e) {
+ }
+ }
+
+ @Override
+ public void onSend() {
+ // TODO instruct delegate to encode the proper content type into the message.
+ }
+
+ private void initDelegate(boolean useAmqpTypes) {
+ if (!useAmqpTypes) {
+ delegate = new AmqpSerializedObjectDelegate(getAmqpMessage());
+ } else {
+ delegate = new AmqpTypedObjectDelegate(getAmqpMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
new file mode 100644
index 0000000..0999225
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_STREAM_MESSAGE;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.jms.MessageEOFException;
+
+import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS StreamMessage
+ * type.
+ */
+public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements JmsStreamMessageFacade {
+
+ private List<Object> list;
+ private int position = 0;
+
+ /**
+ * Create a new facade ready for sending.
+ *
+ * @param connection
+ * the connection instance that created this facade.
+ */
+ public AmqpJmsStreamMessageFacade(AmqpConnection connection) {
+ super(connection);
+ initializeEmptyList();
+ setAnnotation(JMS_MSG_TYPE, JMS_STREAM_MESSAGE);
+ }
+
+ /**
+ * Creates a new Facade around an incoming AMQP Message for dispatch to the
+ * JMS Consumer instance.
+ *
+ * @param connection
+ * the connection that created this Facade.
+ * @param message
+ * the incoming Message instance that is being wrapped.
+ */
+ @SuppressWarnings("unchecked")
+ public AmqpJmsStreamMessageFacade(AmqpConnection connection, Message message) {
+ super(connection, message);
+
+ Section body = getAmqpMessage().getBody();
+ if (body == null) {
+ initializeEmptyList();
+ } else if (body instanceof AmqpValue) {
+ Object value = ((AmqpValue) body).getValue();
+
+ if (value == null) {
+ initializeEmptyList();
+ } else if (value instanceof List) {
+ list = (List<Object>) value;
+ } else {
+ throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+ }
+ } else {
+ throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public JmsStreamMessageFacade copy() {
+ AmqpJmsStreamMessageFacade copy = new AmqpJmsStreamMessageFacade(connection);
+ copyInto(copy);
+ copy.list.addAll(list);
+ return copy;
+ }
+
+ /**
+ * @return the appropriate byte value that indicates the type of message this is.
+ */
+ @Override
+ public byte getJmsMsgType() {
+ return JMS_STREAM_MESSAGE;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !list.isEmpty() && position < list.size();
+ }
+
+ @Override
+ public Object peek() throws MessageEOFException {
+ if (list.isEmpty() || position >= list.size()) {
+ throw new MessageEOFException("Attempt to read past end of stream");
+ }
+
+ Object object = list.get(position);
+ if (object instanceof Binary) {
+ // Copy to a byte[], ensure we copy only the required portion.
+ Binary bin = ((Binary) object);
+ object = Arrays.copyOfRange(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+ }
+
+ return object;
+ }
+
+ @Override
+ public void pop() throws MessageEOFException {
+ if (list.isEmpty() || position >= list.size()) {
+ throw new MessageEOFException("Attempt to read past end of stream");
+ }
+
+ position++;
+ }
+
+ @Override
+ public void put(Object value) {
+ Object entry = value;
+ if (entry instanceof byte[]) {
+ entry = new Binary((byte[]) value);
+ }
+
+ list.add(entry);
+ }
+
+ @Override
+ public void reset() {
+ position = 0;
+ }
+
+ @Override
+ public void clearBody() {
+ list.clear();
+ position = 0;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return list.isEmpty();
+ }
+
+ private void initializeEmptyList() {
+ List<Object> list = new ArrayList<Object>();
+ message.setBody(new AmqpValue(list));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
new file mode 100644
index 0000000..6c2421b
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
+import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS TextMessage
+ * type.
+ */
+public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements JmsTextMessageFacade {
+
+ private static final String UTF_8 = "UTF-8";
+
+ /**
+ * Content type, only to be used when message uses a data
+ * body section, and not when using an amqp-value body section
+ */
+ public static final String CONTENT_TYPE = "text/plain";
+
+ private final CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
+
+ /**
+ * Create a new AMQP Message facade ready for sending.
+ *
+ * @param connection
+ * The AMQP Connection that created this message.
+ */
+ public AmqpJmsTextMessageFacade(AmqpConnection connection) {
+ super(connection);
+ setAnnotation(JMS_MSG_TYPE, JMS_TEXT_MESSAGE);
+ setText(null);
+ }
+
+ /**
+ * Creates a new Facade around an incoming AMQP Message for dispatch to the
+ * JMS Consumer instance.
+ *
+ * @param connection
+ * the connection that created this Facade.
+ * @param message
+ * the incoming Message instance that is being wrapped.
+ */
+ public AmqpJmsTextMessageFacade(AmqpConnection connection, Message message) {
+ super(connection, message);
+ }
+
+ /**
+ * @return the appropriate byte value that indicates the type of message this is.
+ */
+ @Override
+ public byte getJmsMsgType() {
+ return JMS_TEXT_MESSAGE;
+ }
+
+ @Override
+ public JmsTextMessageFacade copy() throws JMSException {
+ AmqpJmsTextMessageFacade copy = new AmqpJmsTextMessageFacade(connection);
+ copyInto(copy);
+ copy.setText(getText());
+ return copy;
+ }
+
+ @Override
+ public String getText() throws JMSException {
+ Section body = getAmqpMessage().getBody();
+
+ if (body == null) {
+ return null;
+ } else if (body instanceof Data) {
+ Data data = (Data) body;
+ if (data.getValue() == null || data.getValue().getLength() == 0) {
+ return "";
+ } else {
+ Binary b = data.getValue();
+ ByteBuffer buf = ByteBuffer.wrap(b.getArray(), b.getArrayOffset(), b.getLength());
+
+ try {
+ CharBuffer chars = decoder.decode(buf);
+ return String.valueOf(chars);
+ } catch (CharacterCodingException e) {
+ throw JmsExceptionSupport.create("Cannot decode String in UFT-8", e);
+ }
+ }
+ } else if (body instanceof AmqpValue) {
+ Object value = ((AmqpValue) body).getValue();
+
+ if (value == null || value instanceof String) {
+ return (String) value;
+ } else {
+ throw new IllegalStateException("Unexpected amqp-value body content type: " + value.getClass().getSimpleName());
+ }
+ } else {
+ throw new IllegalStateException("Unexpected message body type: " + body.getClass().getSimpleName());
+ }
+ }
+
+ @Override
+ public void setText(String value) {
+ AmqpValue body = new AmqpValue(value);
+ getAmqpMessage().setBody(body);
+ }
+
+ @Override
+ public void clearBody() {
+ setText(null);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ Section body = getAmqpMessage().getBody();
+
+ if (body == null) {
+ return true;
+ } else if (body instanceof Data) {
+ Data data = (Data) body;
+ if (data.getValue() == null || data.getValue().getLength() == 0) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java
new file mode 100644
index 0000000..0bda795
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageIdHelper.java
@@ -0,0 +1,270 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.qpid.jms.exceptions.IdConversionException;
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id values between
+ * the AMQP types and the Strings values used by JMS.
+ *
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
+ * for interoperability with other AMQP clients, the following encoding can be used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value<br/>
+ *
+ * "AMQP_BINARY:<hex representation of binary content>"<br/>
+ * "AMQP_UUID:<string representation of uuid>"<br/>
+ * "AMQP_ULONG:<string representation of ulong>"<br/>
+ * "AMQP_STRING:<string>"<br/>
+ *
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
+ *
+ * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will be thrown.
+ *
+ */
+public class AmqpMessageIdHelper {
+ public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+ public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+ public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+ public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+ public static final String JMS_ID_PREFIX = "ID:";
+
+ private static final int JMS_ID_PREFIX_LENGTH = JMS_ID_PREFIX.length();
+ private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
+ private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
+ private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
+ private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
+ private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+ /**
+ * Checks whether the given string begins with "ID:" prefix used to denote a JMSMessageID
+ *
+ * @param string the string to check
+ * @return true if and only id the string begins with "ID:"
+ */
+ public boolean hasMessageIdPrefix(String string) {
+ if (string == null) {
+ return false;
+ }
+
+ return string.startsWith(JMS_ID_PREFIX);
+ }
+
+ /**
+ * Returns the suffix of the given string after removing the first "ID:" prefix (if present).
+ *
+ * @param string the string to process
+ * @return the suffix, or the original String if the "ID:" prefix is not present
+ */
+ public String stripMessageIdPrefix(String id) {
+ if (hasMessageIdPrefix(id)) {
+ return strip(id, JMS_ID_PREFIX_LENGTH);
+ } else {
+ return id;
+ }
+ }
+
+ private String strip(String id, int numChars) {
+ return id.substring(numChars);
+ }
+
+ /**
+ * Takes the provided amqp messageId style object, and convert it to a base string.
+ * Encodes type information as a prefix where necessary to convey or escape the type
+ * of the provided object.
+ *
+ * @param messageId the object to process
+ * @return the base string to be used in creating the actual JMS id.
+ */
+ public String toBaseMessageIdString(Object messageId) {
+ if (messageId == null) {
+ return null;
+ } else if (messageId instanceof String) {
+ String stringId = (String) messageId;
+
+ // If the given string has a type encoding prefix,
+ // we need to escape it as an encoded string (even if
+ // the existing encoding prefix was also for string)
+ if (hasTypeEncodingPrefix(stringId)) {
+ return AMQP_STRING_PREFIX + stringId;
+ } else {
+ return stringId;
+ }
+ } else if (messageId instanceof UUID) {
+ return AMQP_UUID_PREFIX + messageId.toString();
+ } else if (messageId instanceof BigInteger || messageId instanceof Long) {
+ return AMQP_ULONG_PREFIX + messageId.toString();
+ } else if (messageId instanceof ByteBuffer) {
+ ByteBuffer dup = ((ByteBuffer) messageId).duplicate();
+
+ byte[] bytes = new byte[dup.remaining()];
+ dup.get(bytes);
+
+ String hex = convertBinaryToHexString(bytes);
+
+ return AMQP_BINARY_PREFIX + hex;
+ } else {
+ throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
+ }
+ }
+
+ private boolean hasTypeEncodingPrefix(String stringId) {
+ return hasAmqpBinaryPrefix(stringId) ||
+ hasAmqpUuidPrefix(stringId) ||
+ hasAmqpUlongPrefix(stringId) ||
+ hasAmqpStringPrefix(stringId);
+ }
+
+ private boolean hasAmqpStringPrefix(String stringId) {
+ return stringId.startsWith(AMQP_STRING_PREFIX);
+ }
+
+ private boolean hasAmqpUlongPrefix(String stringId) {
+ return stringId.startsWith(AMQP_ULONG_PREFIX);
+ }
+
+ private boolean hasAmqpUuidPrefix(String stringId) {
+ return stringId.startsWith(AMQP_UUID_PREFIX);
+ }
+
+ private boolean hasAmqpBinaryPrefix(String stringId) {
+ return stringId.startsWith(AMQP_BINARY_PREFIX);
+ }
+
+ /**
+ * Takes the provided base id string and return the appropriate amqp messageId style object.
+ * Converts the type based on any relevant encoding information found as a prefix.
+ *
+ * @param baseId the object to be converted
+ * @return the amqp messageId style object
+ * @throws IdConversionException if the provided baseId String indicates an encoded type but can't be converted to that type.
+ */
+ public Object toIdObject(String baseId) throws IdConversionException {
+ if (baseId == null) {
+ return null;
+ }
+
+ try {
+ if (hasAmqpUuidPrefix(baseId)) {
+ String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
+ return UUID.fromString(uuidString);
+ } else if (hasAmqpUlongPrefix(baseId)) {
+ String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
+ return new BigInteger(longString);
+ } else if (hasAmqpStringPrefix(baseId)) {
+ return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
+ } else if (hasAmqpBinaryPrefix(baseId)) {
+ String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
+ byte[] bytes = convertHexStringToBinary(hexString);
+ return ByteBuffer.wrap(bytes);
+ } else {
+ // We have a string without any type prefix, transmit it as-is.
+ return baseId;
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IdConversionException("Unable to convert ID value", e);
+ }
+ }
+
+ /**
+ * Convert the provided hex-string into a binary representation where each byte represents
+ * two characters of the hex string.
+ *
+ * The hex characters may be upper or lower case.
+ *
+ * @param hexString string to convert
+ * @return a byte array containing the binary representation
+ * @throws IllegalArgumentException if the provided String is a non-even length or contains non-hex characters
+ */
+ public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
+ int length = hexString.length();
+
+ // As each byte needs two characters in the hex encoding, the string must be an even length.
+ if (length % 2 != 0) {
+ throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
+ }
+
+ byte[] binary = new byte[length / 2];
+
+ for (int i = 0; i < length; i += 2) {
+ char highBitsChar = hexString.charAt(i);
+ char lowBitsChar = hexString.charAt(i + 1);
+
+ int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+ int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+ binary[i / 2] = (byte) (highBits + lowBits);
+ }
+
+ return binary;
+ }
+
+ private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
+ if (ch >= '0' && ch <= '9') {
+ // subtract '0' to get difference in position as an int
+ return ch - '0';
+ } else if (ch >= 'A' && ch <= 'F') {
+ // subtract 'A' to get difference in position as an int
+ // and then add 10 for the offset of 'A'
+ return ch - 'A' + 10;
+ } else if (ch >= 'a' && ch <= 'f') {
+ // subtract 'a' to get difference in position as an int
+ // and then add 10 for the offset of 'a'
+ return ch - 'a' + 10;
+ }
+
+ throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
+ }
+
+ /**
+ * Convert the provided binary into a hex-string representation where each character
+ * represents 4 bits of the provided binary, i.e each byte requires two characters.
+ *
+ * The returned hex characters are upper-case.
+ *
+ * @param bytes binary to convert
+ * @return a String containing a hex representation of the bytes
+ */
+ public String convertBinaryToHexString(byte[] bytes) {
+ // Each byte is represented as 2 chars
+ StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+ for (byte b : bytes) {
+ // The byte will be expanded to int before shifting, replicating the
+ // sign bit, so mask everything beyond the first 4 bits afterwards
+ int highBitsInt = (b >> 4) & 0xF;
+ // We only want the first 4 bits
+ int lowBitsInt = b & 0xF;
+
+ builder.append(HEX_CHARS[highBitsInt]);
+ builder.append(HEX_CHARS[lowBitsInt]);
+ }
+
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
new file mode 100644
index 0000000..a01d415
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+
+import org.apache.qpid.proton.amqp.Symbol;
+
+/**
+ * Support class containing constant values and static methods that are
+ * used to map to / from AMQP Message types being sent or received.
+ */
+public final class AmqpMessageSupport {
+
+ /**
+ * The Annotation name to store the destination name that the Message
+ * will be sent to. The Message should also be tagged with the appropriate
+ * destination attribute to allow the receiver to determine the correct
+ * destination type.
+ */
+ public static final String AMQP_TO_ANNOTATION = "x-opt-to-type";
+
+ /**
+ * The Annotation name to store the destination name that the sender wants
+ * to receive replies on. The Message should also be tagged with the
+ * appropriate destination attribute to allow the receiver to determine the
+ * correct destination type.
+ */
+ public static final String AMQP_REPLY_TO_ANNOTATION = "x-opt-reply-type";
+
+ /**
+ * Attribute used to mark a destination as temporary.
+ */
+ public static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+ /**
+ * Attribute used to mark a destination as being a Queue type.
+ */
+ public static final String QUEUE_ATTRIBUTES = "queue";
+
+ /**
+ * Attribute used to mark a destination as being a Topic type.
+ */
+ public static final String TOPIC_ATTRIBUTES = "topic";
+
+ /**
+ * Convenience value used to mark a destination as a Temporary Queue.
+ */
+ public static final String TEMP_QUEUE_ATTRIBUTES = TEMPORARY_ATTRIBUTE + "," + QUEUE_ATTRIBUTES;
+
+ /**
+ * Convenience value used to mark a destination as a Temporary Topic.
+ */
+ public static final String TEMP_TOPIC_ATTRIBUTES = TEMPORARY_ATTRIBUTE + "," + TOPIC_ATTRIBUTES;
+
+ /**
+ * Attribute used to mark the Application defined correlation Id that has been
+ * set for the message.
+ */
+ public static final String JMS_APP_CORRELATION_ID = "x-opt-app-correlation-id";
+
+ /**
+ * Attribute used to mark the JMSType value set on the message.
+ */
+ public static final String JMS_TYPE = "x-opt-jms-type";
+
+ /**
+ * Attribute used to mark the JMS Type that the message instance represents.
+ */
+ public static final String JMS_MSG_TYPE = "x-opt-jms-msg-type";
+
+ /**
+ * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message
+ * which has no body.
+ */
+ public static final byte JMS_MESSAGE = 0;
+
+ /**
+ * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS ObjectMessage
+ * which has an Object value serialized in its message body.
+ */
+ public static final byte JMS_OBJECT_MESSAGE = 1;
+
+ /**
+ * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS MapMessage
+ * which has an Map instance serialized in its message body.
+ */
+ public static final byte JMS_MAP_MESSAGE = 2;
+
+ /**
+ * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS BytesMessage
+ * which has a body that consists of raw bytes.
+ */
+ public static final byte JMS_BYTES_MESSAGE = 3;
+
+ /**
+ * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS StreamMessage
+ * which has a body that is a structured collection of primitives values.
+ */
+ public static final byte JMS_STREAM_MESSAGE = 4;
+
+ /**
+ * Value mapping for JMS_MSG_TYPE which indicates the message is a JMS TextMessage
+ * which has a body that contains a UTF-8 encoded String.
+ */
+ public static final byte JMS_TEXT_MESSAGE = 5;
+
+ public static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
+ public static final String JMS_AMQP_REPLY_TO_GROUP_ID = "JMS_AMQP_REPLY_TO_GROUP_ID";
+ public static final String JMS_AMQP_TYPED_ENCODING = "JMS_AMQP_TYPED_ENCODING";
+
+ /**
+ * Lookup and return the correct Proton Symbol instance based on the given key.
+ *
+ * @param key
+ * the String value name of the Symbol to locate.
+ *
+ * @return the Symbol value that matches the given key.
+ */
+ public static Symbol getSymbol(String key) {
+ return Symbol.valueOf(key);
+ }
+
+ /**
+ * Given a JMS Destination object return the correct message annotations that
+ * will identify the type of Destination the name represents, Queue. Topic, etc.
+ *
+ * @param destination
+ * The JMS Destination to be examined.
+ *
+ * @return the correct message annotation values to describe the given Destination.
+ */
+ public static String destinationAttributes(Destination destination) {
+ if (destination instanceof Queue) {
+ if (destination instanceof TemporaryQueue) {
+ return TEMP_QUEUE_ATTRIBUTES;
+ } else {
+ return QUEUE_ATTRIBUTES;
+ }
+ }
+ if (destination instanceof Topic) {
+ if (destination instanceof TemporaryTopic) {
+ return TEMP_TOPIC_ATTRIBUTES;
+ } else {
+ return TOPIC_ATTRIBUTES;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
new file mode 100644
index 0000000..cfa6237
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Interface for a Delegate object that handles storing and retrieving the Object
+ * value in an Object message.
+ */
+public interface AmqpObjectTypeDelegate {
+
+ /**
+ * Given a serializable instance, store the value into the AMQP message using
+ * the strategy implemented by this delegate.
+ *
+ * @param value
+ * A serializable object instance to be stored in the message.
+ *
+ * @throws IOException if an error occurs during the store operation.
+ */
+ void setObject(Serializable value) throws IOException;
+
+ /**
+ * Read a Serialized object from the AMQP message using the strategy implemented
+ * by this delegate.
+ *
+ * @return an Object that has been read from the stored object data in the message.
+ *
+ * @throws IOException if an error occurs while reading the stored object.
+ * @throws ClassNotFoundException if no class can be found for the stored type.
+ */
+ Serializable getObject() throws IOException, ClassNotFoundException;
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
new file mode 100644
index 0000000..72db9dc
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import org.apache.qpid.jms.util.ClassLoadingAwareObjectInputStream;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+
+/**
+ * Wrapper around an AMQP Message instance that will be treated as a JMS ObjectMessage
+ * type.
+ */
+public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate {
+
+ public static final String CONTENT_TYPE = "application/x-java-serialized-object";
+
+ private final Message message;
+
+ /**
+ * Create a new delegate that uses Java serialization to store the message content.
+ *
+ * @param message
+ * the AMQP message instance where the object is to be stored / read.
+ */
+ public AmqpSerializedObjectDelegate(Message message) {
+ this.message = message;
+ this.message.setContentType(CONTENT_TYPE);
+ }
+
+ @Override
+ public Serializable getObject() throws IOException, ClassNotFoundException {
+ Binary bin = null;
+
+ Section body = message.getBody();
+ if (body == null) {
+ return null;
+ } else if (body instanceof Data) {
+ bin = ((Data) body).getValue();
+ } else {
+ throw new IllegalStateException("Unexpected body type: " + body.getClass().getSimpleName());
+ }
+
+ if (bin == null) {
+ return null;
+ } else {
+ Serializable serialized = null;
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bin.getArray(), bin.getArrayOffset(), bin.getLength());
+ ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(bais)) {
+
+ serialized = (Serializable) objIn.readObject();
+ }
+
+ return serialized;
+ }
+ }
+
+ @Override
+ public void setObject(Serializable value) throws IOException {
+ if(value == null) {
+ // TODO: verify whether not sending a body is ok,
+ // send a serialized null instead if it isn't
+ message.setBody(null);
+ } else {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos)) {
+
+ oos.writeObject(value);
+ oos.flush();
+ oos.close();
+
+ byte[] bytes = baos.toByteArray();
+ message.setBody(new Data(new Binary(bytes)));
+ }
+ }
+
+ // TODO: ensure content type is [still] set?
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org