You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/03/06 11:54:03 UTC
[16/22] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
new file mode 100644
index 0000000..030a7a0
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Decimal128;
+import org.apache.qpid.proton.amqp.Decimal32;
+import org.apache.qpid.proton.amqp.Decimal64;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.WritableBuffer;
+
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createBytesMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMapMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createObjectMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createStreamMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.createTextMessage;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.isContentType;
+
+/**
+ * This class was created just to separate concerns on AMQPConverter.
+ * For better organization of the code.
+ * */
+public class AmqpCoreConverter {
+
+ public static ICoreMessage toCore(AMQPMessage message) throws Exception {
+
+ Section body = message.getProtonMessage().getBody();
+ ServerJMSMessage result;
+
+ if (body == null) {
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
+ result = createObjectMessage(message.getMessageID());
+ } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage()) || isContentType(null, message.getProtonMessage())) {
+ result = createBytesMessage(message.getMessageID());
+ } else {
+ Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+ if (charset != null) {
+ result = createTextMessage(message.getMessageID());
+ } else {
+ result = createMessage(message.getMessageID());
+ }
+ }
+ } else if (body instanceof Data) {
+ Binary payload = ((Data) body).getValue();
+
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
+ result = createObjectMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message.getProtonMessage())) {
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ } else {
+ Charset charset = getCharsetForTextualContent(message.getProtonMessage().getContentType());
+ if (StandardCharsets.UTF_8.equals(charset)) {
+ ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
+
+ try {
+ CharBuffer chars = charset.newDecoder().decode(buf);
+ result = createTextMessage(message.getMessageID(), String.valueOf(chars));
+ } catch (CharacterCodingException e) {
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ }
+ } else {
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ }
+ }
+
+ } else if (body instanceof AmqpSequence) {
+ AmqpSequence sequence = (AmqpSequence) body;
+ ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+ for (Object item : sequence.getValue()) {
+ m.writeObject(item);
+ }
+
+ result = m;
+ } else if (body instanceof AmqpValue) {
+ Object value = ((AmqpValue) body).getValue();
+ if (value == null || value instanceof String) {
+ result = createTextMessage(message.getMessageID(), (String) value);
+
+ } else if (value instanceof Binary) {
+ Binary payload = (Binary) value;
+
+ if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), message.getProtonMessage())) {
+ result = createObjectMessage(message.getMessageID(), payload);
+ } else {
+ result = createBytesMessage(message.getMessageID(), payload.getArray(), payload.getArrayOffset(), payload.getLength());
+ }
+
+ } else if (value instanceof List) {
+ ServerJMSStreamMessage m = createStreamMessage(message.getMessageID());
+ for (Object item : (List<Object>) value) {
+ m.writeObject(item);
+ }
+ result = m;
+ } else if (value instanceof Map) {
+ result = createMapMessage(message.getMessageID(), (Map<String, Object>) value);
+ } else {
+ ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+ try {
+ TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
+ TLSEncode.getEncoder().writeObject(body);
+ result = createBytesMessage(message.getMessageID(), buf.array(), 0, buf.writerIndex());
+ } finally {
+ buf.release();
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
+ }
+ }
+ } else {
+ throw new RuntimeException("Unexpected body type: " + body.getClass());
+ }
+
+ populateMessage(result, message.getProtonMessage());
+ result.getInnerMessage().setReplyTo(message.getReplyTo());
+
+ result.encode();
+
+ return result != null ? result.getInnerMessage() : null;
+ }
+
+ protected static ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
+ Header header = amqp.getHeader();
+ if (header != null) {
+ jms.setBooleanProperty(JMS_AMQP_HEADER, true);
+
+ if (header.getDurable() != null) {
+ jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true);
+ jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ } else {
+ jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+
+ if (header.getPriority() != null) {
+ jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true);
+ jms.setJMSPriority(header.getPriority().intValue());
+ } else {
+ jms.setJMSPriority(javax.jms.Message.DEFAULT_PRIORITY);
+ }
+
+ if (header.getFirstAcquirer() != null) {
+ jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
+ }
+
+ if (header.getDeliveryCount() != null) {
+ // AMQP Delivery Count counts only failed delivers where JMS
+ // Delivery Count should include the original delivery in the count.
+ jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1);
+ }
+ } else {
+ jms.setJMSPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+ jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ }
+
+ final MessageAnnotations ma = amqp.getMessageAnnotations();
+ if (ma != null) {
+ for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
+ long deliveryTime = ((Number) entry.getValue()).longValue();
+ jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
+ } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
+ long delay = ((Number) entry.getValue()).longValue();
+ if (delay > 0) {
+ jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
+ }
+ }
+
+ setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
+ }
+ }
+
+ final ApplicationProperties ap = amqp.getApplicationProperties();
+ if (ap != null) {
+ for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
+ setProperty(jms, entry.getKey().toString(), entry.getValue());
+ }
+ }
+
+ final Properties properties = amqp.getProperties();
+ if (properties != null) {
+ if (properties.getMessageId() != null) {
+ jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
+ }
+ Binary userId = properties.getUserId();
+ if (userId != null) {
+ // TODO - Better Way to set this?
+ jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
+ }
+ if (properties.getTo() != null) {
+ jms.setJMSDestination(new ServerDestination(properties.getTo()));
+ }
+ if (properties.getSubject() != null) {
+ jms.setJMSType(properties.getSubject());
+ }
+ if (properties.getReplyTo() != null) {
+ jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
+ }
+ if (properties.getCorrelationId() != null) {
+ jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
+ }
+ if (properties.getContentType() != null) {
+ jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
+ }
+ if (properties.getContentEncoding() != null) {
+ jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
+ }
+ if (properties.getCreationTime() != null) {
+ jms.setJMSTimestamp(properties.getCreationTime().getTime());
+ }
+ if (properties.getGroupId() != null) {
+ jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId());
+ }
+ if (properties.getGroupSequence() != null) {
+ jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue());
+ }
+ if (properties.getReplyToGroupId() != null) {
+ jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
+ }
+ if (properties.getAbsoluteExpiryTime() != null) {
+ jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
+ }
+ }
+
+ // If the jms expiration has not yet been set...
+ if (header != null && jms.getJMSExpiration() == 0) {
+ // Then lets try to set it based on the message ttl.
+ long ttl = javax.jms.Message.DEFAULT_TIME_TO_LIVE;
+ if (header.getTtl() != null) {
+ ttl = header.getTtl().longValue();
+ }
+
+ if (ttl == 0) {
+ jms.setJMSExpiration(0);
+ } else {
+ jms.setJMSExpiration(System.currentTimeMillis() + ttl);
+ }
+ }
+
+ final Footer fp = amqp.getFooter();
+ if (fp != null) {
+ for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
+ }
+ }
+
+ return jms;
+ }
+
+ private static void setProperty(javax.jms.Message msg, String key, Object value) throws JMSException {
+ if (value instanceof UnsignedLong) {
+ long v = ((UnsignedLong) value).longValue();
+ msg.setLongProperty(key, v);
+ } else if (value instanceof UnsignedInteger) {
+ long v = ((UnsignedInteger) value).longValue();
+ if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
+ msg.setIntProperty(key, (int) v);
+ } else {
+ msg.setLongProperty(key, v);
+ }
+ } else if (value instanceof UnsignedShort) {
+ int v = ((UnsignedShort) value).intValue();
+ if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
+ msg.setShortProperty(key, (short) v);
+ } else {
+ msg.setIntProperty(key, v);
+ }
+ } else if (value instanceof UnsignedByte) {
+ short v = ((UnsignedByte) value).shortValue();
+ if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
+ msg.setByteProperty(key, (byte) v);
+ } else {
+ msg.setShortProperty(key, v);
+ }
+ } else if (value instanceof Symbol) {
+ msg.setStringProperty(key, value.toString());
+ } else if (value instanceof Decimal128) {
+ msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
+ } else if (value instanceof Decimal64) {
+ msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
+ } else if (value instanceof Decimal32) {
+ msg.setFloatProperty(key, ((Decimal32) value).floatValue());
+ } else if (value instanceof Binary) {
+ msg.setStringProperty(key, value.toString());
+ } else {
+ msg.setObjectProperty(key, value);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
new file mode 100644
index 0000000..111de8c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.Queue;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.codec.EncoderImpl;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.jboss.logging.Logger;
+
+import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
+import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.EMPTY_BINARY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_NATIVE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_PROPERTIES;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_DEST_TYPE_MSG_ANNOTATION;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.QUEUE_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_QUEUE_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TEMP_TOPIC_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.TOPIC_TYPE;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.toAddress;
+
+public class CoreAmqpConverter {
+
+ private static Logger logger = Logger.getLogger(CoreAmqpConverter.class);
+
+ public static AMQPMessage checkAMQP(Message message) throws Exception {
+ if (message instanceof AMQPMessage) {
+ return (AMQPMessage)message;
+ } else {
+ // It will first convert to Core, then to AMQP
+ return fromCore(message.toCore());
+ }
+ }
+
+ public static AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception {
+ if (coreMessage == null) {
+ return null;
+ }
+
+ ServerJMSMessage message = ServerJMSMessage.wrapCoreMessage(coreMessage);
+ message.decode();
+
+ long messageFormat = 0;
+ Header header = null;
+ final Properties properties = new Properties();
+ Map<Symbol, Object> daMap = null;
+ final Map<Symbol, Object> maMap = new HashMap<>();
+ Map<String, Object> apMap = null;
+ Map<Object, Object> footerMap = null;
+
+ Section body = convertBody(message, maMap, properties);
+
+ if (message.getInnerMessage().isDurable()) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setDurable(true);
+ }
+ byte priority = (byte) message.getJMSPriority();
+ if (priority != javax.jms.Message.DEFAULT_PRIORITY) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setPriority(UnsignedByte.valueOf(priority));
+ }
+ String type = message.getJMSType();
+ if (type != null) {
+ properties.setSubject(type);
+ }
+ String messageId = message.getJMSMessageID();
+ if (messageId != null) {
+ try {
+ properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId));
+ } catch (ActiveMQAMQPIllegalStateException e) {
+ properties.setMessageId(messageId);
+ }
+ }
+ Destination destination = message.getJMSDestination();
+ if (destination != null) {
+ properties.setTo(toAddress(destination));
+ maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
+ }
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo != null) {
+ properties.setReplyTo(toAddress(replyTo));
+ maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
+ }
+ String correlationId = message.getJMSCorrelationID();
+ if (correlationId != null) {
+ try {
+ properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
+ } catch (ActiveMQAMQPIllegalStateException e) {
+ properties.setCorrelationId(correlationId);
+ }
+ }
+ long expiration = message.getJMSExpiration();
+ if (expiration != 0) {
+ long ttl = expiration - System.currentTimeMillis();
+ if (ttl < 0) {
+ ttl = 1;
+ }
+
+ if (header == null) {
+ header = new Header();
+ }
+ header.setTtl(new UnsignedInteger((int) ttl));
+
+ properties.setAbsoluteExpiryTime(new Date(expiration));
+ }
+ long timeStamp = message.getJMSTimestamp();
+ if (timeStamp != 0) {
+ properties.setCreationTime(new Date(timeStamp));
+ }
+
+ final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage());
+ for (String key : keySet) {
+ if (key.startsWith("JMSX")) {
+ if (key.equals("JMSXUserID")) {
+ String value = message.getStringProperty(key);
+ properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
+ continue;
+ } else if (key.equals("JMSXGroupID")) {
+ String value = message.getStringProperty(key);
+ properties.setGroupId(value);
+ continue;
+ } else if (key.equals("JMSXGroupSeq")) {
+ UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
+ properties.setGroupSequence(value);
+ continue;
+ }
+ } else if (key.startsWith(JMS_AMQP_PREFIX)) {
+ // AMQP Message Information stored from a conversion to the Core Message
+ if (key.equals(JMS_AMQP_NATIVE)) {
+ // skip..internal use only
+ continue;
+ } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setFirstAcquirer(message.getBooleanProperty(key));
+ continue;
+ } else if (key.equals(JMS_AMQP_HEADER)) {
+ if (header == null) {
+ header = new Header();
+ }
+ continue;
+ } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setDurable(message.getInnerMessage().isDurable());
+ continue;
+ } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setPriority(UnsignedByte.valueOf(priority));
+ continue;
+ } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
+ continue;
+ } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) {
+ if (daMap == null) {
+ daMap = new HashMap<>();
+ }
+ String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
+ daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+ continue;
+ } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
+ String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
+ maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
+ continue;
+ } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
+ properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
+ continue;
+ } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) {
+ properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
+ continue;
+ } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
+ properties.setReplyToGroupId(message.getStringProperty(key));
+ continue;
+ } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
+ if (footerMap == null) {
+ footerMap = new HashMap<>();
+ }
+ String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
+ footerMap.put(name, message.getObjectProperty(key));
+ continue;
+ }
+ } else if (key.equals("_AMQ_GROUP_ID")) {
+ String value = message.getStringProperty(key);
+ properties.setGroupId(value);
+ continue;
+ } else if (key.equals(NATIVE_MESSAGE_ID)) {
+ // skip..internal use only
+ continue;
+ } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
+ // skip..remove annotation from previous inbound transformation
+ continue;
+ }
+
+ if (apMap == null) {
+ apMap = new HashMap<>();
+ }
+
+ Object objectProperty = message.getObjectProperty(key);
+ if (objectProperty instanceof byte[]) {
+ objectProperty = new Binary((byte[]) objectProperty);
+ }
+
+ apMap.put(key, objectProperty);
+ }
+
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ try {
+ EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+
+ if (header != null) {
+ encoder.writeObject(header);
+ }
+ if (daMap != null) {
+ encoder.writeObject(new DeliveryAnnotations(daMap));
+ }
+ if (maMap != null) {
+ encoder.writeObject(new MessageAnnotations(maMap));
+ }
+ if (properties != null) {
+ encoder.writeObject(properties);
+ }
+ if (apMap != null) {
+ encoder.writeObject(new ApplicationProperties(apMap));
+ }
+ if (body != null) {
+ encoder.writeObject(body);
+ }
+ if (footerMap != null) {
+ encoder.writeObject(new Footer(footerMap));
+ }
+
+ byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ AMQPMessage amqpMessage = new AMQPMessage(messageFormat, data);
+ amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
+ amqpMessage.setReplyTo(coreMessage.getReplyTo());
+ return amqpMessage;
+
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
+ private static Section convertBody(ServerJMSMessage message, Map<Symbol, Object> maMap, Properties properties) throws JMSException {
+
+ Section body = null;
+
+ if (message instanceof ServerJMSBytesMessage) {
+ Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message);
+
+ maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_BYTES_MESSAGE);
+ if (payload == null) {
+ payload = EMPTY_BINARY;
+ } else {
+ body = new AmqpValue(payload);
+ }
+ } else if (message instanceof ServerJMSTextMessage) {
+ body = new AmqpValue(((TextMessage) message).getText());
+ maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_TEXT_MESSAGE);
+ } else if (message instanceof ServerJMSMapMessage) {
+ body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message));
+ maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_MAP_MESSAGE);
+ } else if (message instanceof ServerJMSStreamMessage) {
+ maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_STREAM_MESSAGE);
+ ArrayList<Object> list = new ArrayList<>();
+ final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message;
+ try {
+ while (true) {
+ list.add(m.readObject());
+ }
+ } catch (MessageEOFException e) {
+ }
+
+ body = new AmqpSequence(list);
+ } else if (message instanceof ServerJMSObjectMessage) {
+ properties.setContentType(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
+ maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_OBJECT_MESSAGE);
+ Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message);
+
+ if (payload == null) {
+ payload = EMPTY_BINARY;
+ }
+
+ body = new Data(payload);
+
+ // For a non-AMQP message we tag the outbound content type as containing
+ // a serialized Java object so that an AMQP client has a hint as to what
+ // we are sending it.
+ if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
+ message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString());
+ }
+ } else if (message instanceof ServerJMSMessage) {
+ maMap.put(AMQPMessageSupport.JMS_MSG_TYPE, AMQPMessageSupport.JMS_MESSAGE);
+ // If this is not an AMQP message that was converted then the original encoding
+ // will be unknown so we check for special cases of messages with special data
+ // encoded into the server message body.
+ ICoreMessage internalMessage = message.getInnerMessage();
+ int readerIndex = internalMessage.getBodyBuffer().readerIndex();
+ try {
+ Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
+ if (s != null) {
+ body = new AmqpValue(s.toString());
+ }
+ } catch (Throwable ignored) {
+ logger.debug("Exception ignored during conversion", ignored.getMessage(), ignored);
+ body = new AmqpValue("Conversion to AMQP error!");
+ } finally {
+ internalMessage.getBodyBuffer().readerIndex(readerIndex);
+ }
+ }
+
+ return body;
+ }
+
+ private static Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException {
+ byte[] data = new byte[(int) message.getBodyLength()];
+ message.readBytes(data);
+ message.reset(); // Need to reset after readBytes or future readBytes
+
+ return new Binary(data);
+ }
+
+ private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException {
+ Binary result = null;
+ String text = message.getText();
+ if (text != null) {
+ result = new Binary(text.getBytes(StandardCharsets.UTF_8));
+ }
+
+ return result;
+ }
+
+ private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
+ message.getInnerMessage().getBodyBuffer().resetReaderIndex();
+ int size = message.getInnerMessage().getBodyBuffer().readInt();
+ byte[] bytes = new byte[size];
+ message.getInnerMessage().getBodyBuffer().readBytes(bytes);
+
+ return new Binary(bytes);
+ }
+
+ private static Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException {
+ final HashMap<String, Object> map = new LinkedHashMap<>();
+
+ @SuppressWarnings("unchecked")
+ final Enumeration<String> names = message.getMapNames();
+ while (names.hasMoreElements()) {
+ String key = names.nextElement();
+ Object value = message.getObject(key);
+ if (value instanceof byte[]) {
+ value = new Binary((byte[]) value);
+ }
+ map.put(key, value);
+ }
+
+ return map;
+ }
+
+ private static byte destinationType(Destination destination) {
+ if (destination instanceof Queue) {
+ if (destination instanceof TemporaryQueue) {
+ return TEMP_QUEUE_TYPE;
+ } else {
+ return QUEUE_TYPE;
+ }
+ } else if (destination instanceof Topic) {
+ if (destination instanceof TemporaryTopic) {
+ return TEMP_TOPIC_TYPE;
+ } else {
+ return TOPIC_TYPE;
+ }
+ }
+
+ throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
deleted file mode 100644
index 6aa44a4..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter;
-
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
-
-import java.io.IOException;
-
-import javax.jms.BytesMessage;
-
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.converter.message.OutboundTransformer;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.codec.WritableBuffer;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-public class ProtonMessageConverter implements MessageConverter {
-
- public ProtonMessageConverter(IDGenerator idGenerator) {
- inboundTransformer = new JMSMappingInboundTransformer(idGenerator);
- outboundTransformer = new JMSMappingOutboundTransformer(idGenerator);
- }
-
- private final InboundTransformer inboundTransformer;
- private final OutboundTransformer outboundTransformer;
-
- @Override
- public ServerMessage inbound(Object messageSource) throws Exception {
- EncodedMessage encodedMessageSource = (EncodedMessage) messageSource;
- ServerJMSMessage transformedMessage = null;
-
- try {
- transformedMessage = inboundTransformer.transform(encodedMessageSource);
- } catch (Exception e) {
- ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
- ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
-
- throw new IOException("Failed to transform incoming delivery, skipping.");
- }
-
- transformedMessage.encode();
-
- return (ServerMessage) transformedMessage.getInnerMessage();
- }
-
- @Override
- public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
- // Useful for testing but not recommended for real life use.
- ByteBuf nettyBuffer = Unpooled.buffer(1024);
- NettyWritable buffer = new NettyWritable(nettyBuffer);
- long messageFormat = (long) outbound(messageOutbound, deliveryCount, buffer);
-
- EncodedMessage encoded = new EncodedMessage(messageFormat, nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(),
- nettyBuffer.readableBytes());
-
- return encoded;
- }
-
- public Object outbound(ServerMessage messageOutbound, int deliveryCount, WritableBuffer buffer) throws Exception {
- ServerJMSMessage jmsMessage = AMQPMessageSupport.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
-
- jmsMessage.decode();
-
- if (jmsMessage.getBooleanProperty(JMS_AMQP_NATIVE)) {
- if (jmsMessage instanceof BytesMessage) {
- return AMQPNativeOutboundTransformer.transform(outboundTransformer, (ServerJMSBytesMessage) jmsMessage, buffer);
- } else {
- return 0;
- }
- } else {
- return outboundTransformer.transform(jmsMessage, buffer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
index abdf808..8d473a7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
@@ -19,8 +19,8 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
-import org.apache.activemq.artemis.core.message.impl.MessageImpl;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@@ -49,13 +49,13 @@ import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF;
public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage {
- public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) {
- super(message, deliveryCount);
+ public ServerJMSBytesMessage(ICoreMessage message) {
+ super(message);
}
@Override
public long getBodyLength() throws JMSException {
- return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET;
+ return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
index 0268065..f72239e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
@@ -25,9 +25,9 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.utils.TypedProperties;
import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap;
@@ -52,8 +52,8 @@ public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMe
/*
* This constructor is used to construct messages prior to sending
*/
- public ServerJMSMapMessage(MessageInternal message, int deliveryCount) {
- super(message, deliveryCount);
+ public ServerJMSMapMessage(ICoreMessage message) {
+ super(message);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
index f9a94f5..2a52f7a 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -16,43 +16,56 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
-import java.util.Collections;
-import java.util.Enumeration;
-
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+import java.util.Collections;
+import java.util.Enumeration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.reader.MessageUtil;
import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
+import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
+import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
public class ServerJMSMessage implements Message {
- protected final MessageInternal message;
-
- protected int deliveryCount;
-
- public MessageInternal getInnerMessage() {
- return message;
- }
+ protected final ICoreMessage message;
+ private ActiveMQBuffer readBodyBuffer;
- public ServerJMSMessage(MessageInternal message, int deliveryCount) {
+ public ServerJMSMessage(ICoreMessage message) {
this.message = message;
- this.deliveryCount = deliveryCount;
}
- public int getDeliveryCount() {
- return deliveryCount;
+ public static ServerJMSMessage wrapCoreMessage(ICoreMessage wrapped) {
+ switch (wrapped.getType()) {
+ case STREAM_TYPE:
+ return new ServerJMSStreamMessage(wrapped);
+ case BYTES_TYPE:
+ return new ServerJMSBytesMessage(wrapped);
+ case MAP_TYPE:
+ return new ServerJMSMapMessage(wrapped);
+ case TEXT_TYPE:
+ return new ServerJMSTextMessage(wrapped);
+ case OBJECT_TYPE:
+ return new ServerJMSObjectMessage(wrapped);
+ default:
+ return new ServerJMSMessage(wrapped);
+ }
}
- private ActiveMQBuffer readBodyBuffer;
+ public ICoreMessage getInnerMessage() {
+ return message;
+ }
/**
* When reading we use a protected copy so multi-threads can work fine
@@ -60,7 +73,7 @@ public class ServerJMSMessage implements Message {
protected ActiveMQBuffer getReadBodyBuffer() {
if (readBodyBuffer == null) {
// to avoid clashes between multiple threads
- readBodyBuffer = message.getBodyBufferDuplicate();
+ readBodyBuffer = message.getReadOnlyBodyBuffer();
}
return readBodyBuffer;
}
@@ -113,13 +126,13 @@ public class ServerJMSMessage implements Message {
}
@Override
- public final void setJMSCorrelationID(String correlationID) throws JMSException {
- MessageUtil.setJMSCorrelationID(message, correlationID);
+ public final String getJMSCorrelationID() throws JMSException {
+ return MessageUtil.getJMSCorrelationID(message);
}
@Override
- public final String getJMSCorrelationID() throws JMSException {
- return MessageUtil.getJMSCorrelationID(message);
+ public final void setJMSCorrelationID(String correlationID) throws JMSException {
+ MessageUtil.setJMSCorrelationID(message, correlationID);
}
@Override
@@ -140,7 +153,7 @@ public class ServerJMSMessage implements Message {
@Override
public final Destination getJMSDestination() throws JMSException {
- SimpleString sdest = message.getAddress();
+ SimpleString sdest = message.getAddressSimpleString();
if (sdest == null) {
return null;
@@ -152,7 +165,7 @@ public class ServerJMSMessage implements Message {
@Override
public final void setJMSDestination(Destination destination) throws JMSException {
if (destination == null) {
- message.setAddress(null);
+ message.setAddress((SimpleString)null);
} else {
message.setAddress(((ActiveMQDestination) destination).getSimpleAddress());
}
@@ -254,19 +267,11 @@ public class ServerJMSMessage implements Message {
@Override
public final int getIntProperty(String name) throws JMSException {
- if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
- return deliveryCount;
- }
-
return message.getIntProperty(name);
}
@Override
public final long getLongProperty(String name) throws JMSException {
- if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
- return deliveryCount;
- }
-
return message.getLongProperty(name);
}
@@ -282,10 +287,6 @@ public class ServerJMSMessage implements Message {
@Override
public final String getStringProperty(String name) throws JMSException {
- if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
- return String.valueOf(deliveryCount);
- }
-
return message.getStringProperty(name);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
index d1eaac6..23ffb09 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
@@ -16,13 +16,12 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter.jms;
-import java.io.Serializable;
-
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
+import java.io.Serializable;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.qpid.proton.amqp.Binary;
public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage {
@@ -31,8 +30,8 @@ public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMe
private Binary payload;
- public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) {
- super(message, deliveryCount);
+ public ServerJMSObjectMessage(ICoreMessage message) {
+ super(message);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
index a53fc0e..9aaf4c3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
@@ -21,9 +21,9 @@ import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
import javax.jms.StreamMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.utils.DataConstants;
import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean;
@@ -44,8 +44,8 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
private int bodyLength = 0;
- public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) {
- super(message, deliveryCount);
+ public ServerJMSStreamMessage(ICoreMessage message) {
+ super(message);
}
// StreamMessage implementation ----------------------------------
@@ -180,7 +180,7 @@ public final class ServerJMSStreamMessage extends ServerJMSMessage implements St
@Override
public Object readObject() throws JMSException {
- if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) {
+ if (getReadBodyBuffer().readerIndex() >= getReadBodyBuffer().writerIndex()) {
throw new MessageEOFException("");
}
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
index eb88de0..f770185 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
@@ -19,9 +19,9 @@ package org.apache.activemq.artemis.protocol.amqp.converter.jms;
import javax.jms.JMSException;
import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import static org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText;
import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText;
@@ -49,8 +49,8 @@ public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessag
/*
* This constructor is used to construct messages prior to sending
*/
- public ServerJMSTextMessage(MessageInternal message, int deliveryCount) {
- super(message, deliveryCount);
+ public ServerJMSTextMessage(ICoreMessage message) {
+ super(message);
}
// TextMessage implementation ------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java
deleted file mode 100644
index 01d72c8..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPContentTypeSupport.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.StandardCharsets;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.StringTokenizer;
-
-import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
-
-public final class AMQPContentTypeSupport {
-
- private static final String UTF_8 = "UTF-8";
- private static final String CHARSET = "charset";
- private static final String TEXT = "text";
- private static final String APPLICATION = "application";
- private static final String JAVASCRIPT = "javascript";
- private static final String XML = "xml";
- private static final String XML_VARIANT = "+xml";
- private static final String JSON = "json";
- private static final String JSON_VARIANT = "+json";
- private static final String XML_DTD = "xml-dtd";
- private static final String ECMASCRIPT = "ecmascript";
-
- /**
- * @param contentType
- * the contentType of the received message
- * @return the character set to use, or null if not to treat the message as text
- * @throws ActiveMQAMQPInvalidContentTypeException
- * if the content-type is invalid in some way.
- */
- public static Charset parseContentTypeForTextualCharset(final String contentType) throws ActiveMQAMQPInvalidContentTypeException {
- if (contentType == null || contentType.trim().isEmpty()) {
- throw new ActiveMQAMQPInvalidContentTypeException("Content type can't be null or empty");
- }
-
- int subTypeSeparator = contentType.indexOf("/");
- if (subTypeSeparator == -1) {
- throw new ActiveMQAMQPInvalidContentTypeException("Content type has no '/' separator: " + contentType);
- }
-
- final String type = contentType.substring(0, subTypeSeparator).toLowerCase().trim();
-
- String subTypePart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
-
- String parameterPart = null;
- int parameterSeparator = subTypePart.indexOf(";");
- if (parameterSeparator != -1) {
- if (parameterSeparator < subTypePart.length() - 1) {
- parameterPart = contentType.substring(subTypeSeparator + 1).toLowerCase().trim();
- }
- subTypePart = subTypePart.substring(0, parameterSeparator).trim();
- }
-
- if (subTypePart.isEmpty()) {
- throw new ActiveMQAMQPInvalidContentTypeException("Content type has no subtype after '/'" + contentType);
- }
-
- final String subType = subTypePart;
-
- if (isTextual(type, subType)) {
- String charset = findCharset(parameterPart);
- if (charset == null) {
- charset = UTF_8;
- }
-
- if (UTF_8.equals(charset)) {
- return StandardCharsets.UTF_8;
- } else {
- try {
- return Charset.forName(charset);
- } catch (IllegalCharsetNameException icne) {
- throw new ActiveMQAMQPInvalidContentTypeException("Illegal charset: " + charset);
- } catch (UnsupportedCharsetException uce) {
- throw new ActiveMQAMQPInvalidContentTypeException("Unsupported charset: " + charset);
- }
- }
- }
-
- return null;
- }
-
- // ----- Internal Content Type utilities ----------------------------------//
-
- private static boolean isTextual(String type, String subType) {
- if (TEXT.equals(type)) {
- return true;
- }
-
- if (APPLICATION.equals(type)) {
- if (XML.equals(subType) || JSON.equals(subType) || JAVASCRIPT.equals(subType) || subType.endsWith(XML_VARIANT) || subType.endsWith(JSON_VARIANT)
- || XML_DTD.equals(subType) || ECMASCRIPT.equals(subType)) {
- return true;
- }
- }
-
- return false;
- }
-
- private static String findCharset(String paramaterPart) {
- String charset = null;
-
- if (paramaterPart != null) {
- StringTokenizer tokenizer = new StringTokenizer(paramaterPart, ";");
- while (tokenizer.hasMoreTokens()) {
- String parameter = tokenizer.nextToken().trim();
- int eqIndex = parameter.indexOf('=');
- if (eqIndex != -1) {
- String name = parameter.substring(0, eqIndex);
- if (CHARSET.equalsIgnoreCase(name.trim())) {
- String value = unquote(parameter.substring(eqIndex + 1));
-
- charset = value.toUpperCase();
- break;
- }
- }
- }
- }
-
- return charset;
- }
-
- private static String unquote(String s) {
- if (s.length() > 1 && (s.startsWith("\"") && s.endsWith("\""))) {
- return s.substring(1, s.length() - 1);
- } else {
- return s;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fe0ca4d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
deleted file mode 100644
index 4a2123d..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-
-/**
- * Helper class for identifying and converting message-id and correlation-id values between the
- * AMQP types and the Strings values used by JMS.
- * <p>
- * AMQP messages allow for 4 types of message-id/correlation-id: message-id-string,
- * message-id-binary, message-id-uuid, or message-id-ulong. In order to accept or return a
- * string representation of these for interoperability with other AMQP clients, the following
- * encoding can be used after removing or before adding the "ID:" prefix used for a JMSMessageID
- * value:<br>
- * <p>
- * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
- * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
- * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
- * {@literal "AMQP_STRING:<string>"}<br>
- * <p>
- * The AMQP_STRING encoding exists only for escaping message-id-string values that happen to
- * begin with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used
- * otherwise.
- * <p>
- * When provided a string for conversion which attempts to identify itself as an encoded binary,
- * uuid, or ulong but can't be converted into the indicated format, an exception will be thrown.
- */
-public class AMQPMessageIdHelper {
-
- public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper();
-
- public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
- public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
- public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
- public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
-
- private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
- private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
- private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
- private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
- private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
-
- /**
- * Takes the provided AMQP messageId style object, and convert it to a base string. Encodes
- * type information as a prefix where necessary to convey or escape the type of the provided
- * object.
- *
- * @param messageId
- * the raw messageId object to process
- * @return the base string to be used in creating the actual id.
- */
- public String toBaseMessageIdString(Object messageId) {
- if (messageId == null) {
- return null;
- } else if (messageId instanceof String) {
- String stringId = (String) messageId;
-
- // If the given string has a type encoding prefix,
- // we need to escape it as an encoded string (even if
- // the existing encoding prefix was also for string)
- if (hasTypeEncodingPrefix(stringId)) {
- return AMQP_STRING_PREFIX + stringId;
- } else {
- return stringId;
- }
- } else if (messageId instanceof UUID) {
- return AMQP_UUID_PREFIX + messageId.toString();
- } else if (messageId instanceof UnsignedLong) {
- return AMQP_ULONG_PREFIX + messageId.toString();
- } else if (messageId instanceof Binary) {
- ByteBuffer dup = ((Binary) messageId).asByteBuffer();
-
- byte[] bytes = new byte[dup.remaining()];
- dup.get(bytes);
-
- String hex = convertBinaryToHexString(bytes);
-
- return AMQP_BINARY_PREFIX + hex;
- } else {
- throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
- }
- }
-
- /**
- * Takes the provided base id string and return the appropriate amqp messageId style object.
- * Converts the type based on any relevant encoding information found as a prefix.
- *
- * @param baseId
- * the object to be converted to an AMQP MessageId value.
- * @return the AMQP messageId style object
- * @throws ActiveMQAMQPIllegalStateException
- * if the provided baseId String indicates an encoded type but can't be converted to
- * that type.
- */
- public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
- if (baseId == null) {
- return null;
- }
-
- try {
- if (hasAmqpUuidPrefix(baseId)) {
- String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
- return UUID.fromString(uuidString);
- } else if (hasAmqpUlongPrefix(baseId)) {
- String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
- return UnsignedLong.valueOf(longString);
- } else if (hasAmqpStringPrefix(baseId)) {
- return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
- } else if (hasAmqpBinaryPrefix(baseId)) {
- String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
- byte[] bytes = convertHexStringToBinary(hexString);
- return new Binary(bytes);
- } else {
- // We have a string without any type prefix, transmit it as-is.
- return baseId;
- }
- } catch (IllegalArgumentException e) {
- throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
- }
- }
-
- /**
- * Convert the provided hex-string into a binary representation where each byte represents
- * two characters of the hex string.
- * <p>
- * The hex characters may be upper or lower case.
- *
- * @param hexString
- * string to convert to a binary value.
- * @return a byte array containing the binary representation
- * @throws IllegalArgumentException
- * if the provided String is a non-even length or contains non-hex characters
- */
- public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
- int length = hexString.length();
-
- // As each byte needs two characters in the hex encoding, the string must be an even
- // length.
- if (length % 2 != 0) {
- throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
- }
-
- byte[] binary = new byte[length / 2];
-
- for (int i = 0; i < length; i += 2) {
- char highBitsChar = hexString.charAt(i);
- char lowBitsChar = hexString.charAt(i + 1);
-
- int highBits = hexCharToInt(highBitsChar, hexString) << 4;
- int lowBits = hexCharToInt(lowBitsChar, hexString);
-
- binary[i / 2] = (byte) (highBits + lowBits);
- }
-
- return binary;
- }
-
- /**
- * Convert the provided binary into a hex-string representation where each character
- * represents 4 bits of the provided binary, i.e each byte requires two characters.
- * <p>
- * The returned hex characters are upper-case.
- *
- * @param bytes
- * the binary value to convert to a hex String instance.
- * @return a String containing a hex representation of the bytes
- */
- public String convertBinaryToHexString(byte[] bytes) {
- // Each byte is represented as 2 chars
- StringBuilder builder = new StringBuilder(bytes.length * 2);
-
- for (byte b : bytes) {
- // The byte will be expanded to int before shifting, replicating the
- // sign bit, so mask everything beyond the first 4 bits afterwards
- int highBitsInt = (b >> 4) & 0xF;
- // We only want the first 4 bits
- int lowBitsInt = b & 0xF;
-
- builder.append(HEX_CHARS[highBitsInt]);
- builder.append(HEX_CHARS[lowBitsInt]);
- }
-
- return builder.toString();
- }
-
- // ----- Internal implementation ------------------------------------------//
-
- private boolean hasTypeEncodingPrefix(String stringId) {
- return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) || hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
- }
-
- private boolean hasAmqpStringPrefix(String stringId) {
- return stringId.startsWith(AMQP_STRING_PREFIX);
- }
-
- private boolean hasAmqpUlongPrefix(String stringId) {
- return stringId.startsWith(AMQP_ULONG_PREFIX);
- }
-
- private boolean hasAmqpUuidPrefix(String stringId) {
- return stringId.startsWith(AMQP_UUID_PREFIX);
- }
-
- private boolean hasAmqpBinaryPrefix(String stringId) {
- return stringId.startsWith(AMQP_BINARY_PREFIX);
- }
-
- private String strip(String id, int numChars) {
- return id.substring(numChars);
- }
-
- private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
- if (ch >= '0' && ch <= '9') {
- // subtract '0' to get difference in position as an int
- return ch - '0';
- } else if (ch >= 'A' && ch <= 'F') {
- // subtract 'A' to get difference in position as an int
- // and then add 10 for the offset of 'A'
- return ch - 'A' + 10;
- } else if (ch >= 'a' && ch <= 'f') {
- // subtract 'a' to get difference in position as an int
- // and then add 10 for the offset of 'a'
- return ch - 'a' + 10;
- }
-
- throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
- }
-}