You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/09/27 13:54:38 UTC
[11/15] activemq-artemis git commit: ARTEMIS-751 Simplification of
the AMQP implementation
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
new file mode 100644
index 0000000..7e3ba67
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ActiveMQJMSVendor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSVendor;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.utils.IDGenerator;
+
+public class ActiveMQJMSVendor implements JMSVendor {
+
+ private final IDGenerator serverGenerator;
+
+ ActiveMQJMSVendor(IDGenerator idGenerator) {
+ this.serverGenerator = idGenerator;
+ }
+
+ @Override
+ public BytesMessage createBytesMessage() {
+ return new ServerJMSBytesMessage(newMessage(org.apache.activemq.artemis.api.core.Message.BYTES_TYPE), 0);
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() {
+ return new ServerJMSStreamMessage(newMessage(org.apache.activemq.artemis.api.core.Message.STREAM_TYPE), 0);
+ }
+
+ @Override
+ public Message createMessage() {
+ return new ServerJMSMessage(newMessage(org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE), 0);
+ }
+
+ @Override
+ public TextMessage createTextMessage() {
+ return new ServerJMSTextMessage(newMessage(org.apache.activemq.artemis.api.core.Message.TEXT_TYPE), 0);
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() {
+ return new ServerJMSObjectMessage(newMessage(org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE), 0);
+ }
+
+ @Override
+ public MapMessage createMapMessage() {
+ return new ServerJMSMapMessage(newMessage(org.apache.activemq.artemis.api.core.Message.MAP_TYPE), 0);
+ }
+
+ @Override
+ public void setJMSXUserID(Message message, String s) {
+ }
+
+ @Override
+ public Destination createDestination(String name) {
+ return new ServerDestination(name);
+ }
+
+ @Override
+ public void setJMSXGroupID(Message message, String s) {
+ try {
+ message.setStringProperty("_AMQ_GROUP_ID", s);
+ }
+ catch (JMSException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setJMSXGroupSequence(Message message, int i) {
+ try {
+ message.setIntProperty("JMSXGroupSeq", i);
+ }
+ catch (JMSException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setJMSXDeliveryCount(Message message, long l) {
+ try {
+ message.setLongProperty("JMSXDeliveryCount", l);
+ }
+ catch (JMSException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) {
+ switch (messageType) {
+ case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
+ return new ServerJMSStreamMessage(wrapped, deliveryCount);
+ case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
+ return new ServerJMSBytesMessage(wrapped, deliveryCount);
+ case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
+ return new ServerJMSMapMessage(wrapped, deliveryCount);
+ case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
+ return new ServerJMSTextMessage(wrapped, deliveryCount);
+ case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
+ return new ServerJMSObjectMessage(wrapped, deliveryCount);
+ default:
+ return new ServerJMSMessage(wrapped, deliveryCount);
+ }
+ }
+
+ @Override
+ public String toAddress(Destination destination) {
+ if (destination instanceof ActiveMQDestination) {
+ return ((ActiveMQDestination) destination).getAddress();
+ }
+ return null;
+ }
+
+ private ServerMessageImpl newMessage(byte messageType) {
+ ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512);
+ message.setType(messageType);
+ ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
+ return message;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
new file mode 100644
index 0000000..f520387
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/ProtonMessageConverter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter;
+
+import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPNativeOutboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.InboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingInboundTransformer;
+import org.apache.activemq.artemis.protocol.amqp.converter.message.JMSMappingOutboundTransformer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.utils.IDGenerator;
+
+import javax.jms.BytesMessage;
+import java.io.IOException;
+
+public class ProtonMessageConverter implements MessageConverter {
+
+ ActiveMQJMSVendor activeMQJMSVendor;
+
+ private final String prefixVendor;
+
+ public ProtonMessageConverter(IDGenerator idGenerator) {
+ activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator);
+ inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor);
+ outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor);
+ prefixVendor = outboundTransformer.getPrefixVendor();
+ }
+
+ private final InboundTransformer inboundTransformer;
+ private final JMSMappingOutboundTransformer outboundTransformer;
+
+ @Override
+ public ServerMessage inbound(Object messageSource) throws Exception {
+ ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource);
+
+ return (ServerMessage) jmsMessage.getInnerMessage();
+ }
+
+ /**
+ * Just create the JMS Part of the inbound (for testing)
+ *
+ * @param messageSource
+ * @return
+ * @throws Exception https://issues.jboss.org/browse/ENTMQ-1560
+ */
+ public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception {
+ EncodedMessage encodedMessageSource = messageSource;
+ ServerJMSMessage transformedMessage = null;
+
+ InboundTransformer transformer = inboundTransformer;
+
+ while (transformer != null) {
+ try {
+ transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource);
+ break;
+ }
+ catch (Exception e) {
+ ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName());
+ ActiveMQClientLogger.LOGGER.trace("Transformation error:", e);
+
+ transformer = transformer.getFallbackTransformer();
+ }
+ }
+
+ if (transformedMessage == null) {
+ throw new IOException("Failed to transform incoming delivery, skipping.");
+ }
+
+ transformedMessage.encode();
+
+ return transformedMessage;
+ }
+
+ @Override
+ public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception {
+ ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount);
+
+ jmsMessage.decode();
+
+ if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) {
+ if (jmsMessage instanceof BytesMessage) {
+ return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage);
+ }
+ else {
+ return null;
+ }
+ }
+ else {
+ return outboundTransformer.convert(jmsMessage);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
new file mode 100644
index 0000000..967ba08
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerDestination.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+/**
+ * This is just here to avoid all the client checks we need with valid JMS destinations, protocol convertors don't need to
+ * adhere to the jms. semantics.
+ */
+public class ServerDestination extends ActiveMQDestination implements Queue {
+ public ServerDestination(String name) {
+ super(name, name, false, false, null);
+ }
+
+ @Override
+ public String getQueueName() throws JMSException {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
new file mode 100644
index 0000000..abdf808
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSBytesMessage.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadByte;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBytes;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadChar;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadDouble;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadFloat;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadInt;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadLong;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadShort;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUTF;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUnsignedByte;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadUnsignedShort;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteBoolean;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteByte;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteBytes;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteChar;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteDouble;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteFloat;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteInt;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteLong;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteObject;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteShort;
+import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesWriteUTF;
+
+public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage {
+
+ public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) {
+ super(message, deliveryCount);
+ }
+
+ @Override
+ public long getBodyLength() throws JMSException {
+ return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET;
+ }
+
+ @Override
+ public boolean readBoolean() throws JMSException {
+ return bytesReadBoolean(getReadBodyBuffer());
+ }
+
+ @Override
+ public byte readByte() throws JMSException {
+ return bytesReadByte(getReadBodyBuffer());
+ }
+
+ @Override
+ public int readUnsignedByte() throws JMSException {
+ return bytesReadUnsignedByte(getReadBodyBuffer());
+ }
+
+ @Override
+ public short readShort() throws JMSException {
+ return bytesReadShort(getReadBodyBuffer());
+ }
+
+ @Override
+ public int readUnsignedShort() throws JMSException {
+ return bytesReadUnsignedShort(getReadBodyBuffer());
+ }
+
+ @Override
+ public char readChar() throws JMSException {
+ return bytesReadChar(getReadBodyBuffer());
+ }
+
+ @Override
+ public int readInt() throws JMSException {
+ return bytesReadInt(getReadBodyBuffer());
+ }
+
+ @Override
+ public long readLong() throws JMSException {
+ return bytesReadLong(getReadBodyBuffer());
+ }
+
+ @Override
+ public float readFloat() throws JMSException {
+ return bytesReadFloat(getReadBodyBuffer());
+ }
+
+ @Override
+ public double readDouble() throws JMSException {
+ return bytesReadDouble(getReadBodyBuffer());
+ }
+
+ @Override
+ public String readUTF() throws JMSException {
+ return bytesReadUTF(getReadBodyBuffer());
+ }
+
+ @Override
+ public int readBytes(byte[] value) throws JMSException {
+ return bytesReadBytes(getReadBodyBuffer(), value);
+ }
+
+ @Override
+ public int readBytes(byte[] value, int length) throws JMSException {
+ return bytesReadBytes(getReadBodyBuffer(), value, length);
+ }
+
+ @Override
+ public void writeBoolean(boolean value) throws JMSException {
+ bytesWriteBoolean(getWriteBodyBuffer(), value);
+
+ }
+
+ @Override
+ public void writeByte(byte value) throws JMSException {
+ bytesWriteByte(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeShort(short value) throws JMSException {
+ bytesWriteShort(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeChar(char value) throws JMSException {
+ bytesWriteChar(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeInt(int value) throws JMSException {
+ bytesWriteInt(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeLong(long value) throws JMSException {
+ bytesWriteLong(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeFloat(float value) throws JMSException {
+ bytesWriteFloat(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeDouble(double value) throws JMSException {
+ bytesWriteDouble(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeUTF(String value) throws JMSException {
+ bytesWriteUTF(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeBytes(byte[] value) throws JMSException {
+ bytesWriteBytes(getWriteBodyBuffer(), value);
+ }
+
+ @Override
+ public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+ bytesWriteBytes(getWriteBodyBuffer(), value, offset, length);
+ }
+
+ @Override
+ public void writeObject(Object value) throws JMSException {
+ if (!bytesWriteObject(getWriteBodyBuffer(), value)) {
+ throw new JMSException("Can't make conversion of " + value + " to any known type");
+ }
+ }
+
+ @Override
+ public void encode() throws Exception {
+ super.encode();
+ // this is to make sure we encode the body-length before it's persisted
+ getBodyLength();
+ }
+
+ @Override
+ public void decode() throws Exception {
+ super.decode();
+
+ }
+
+ @Override
+ public void reset() throws JMSException {
+ bytesMessageReset(getReadBodyBuffer());
+ bytesMessageReset(getWriteBodyBuffer());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
new file mode 100644
index 0000000..548deb3
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMapMessage.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageFormatException;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.utils.TypedProperties;
+
+import static org.apache.activemq.artemis.reader.MapMessageUtil.readBodyMap;
+import static org.apache.activemq.artemis.reader.MapMessageUtil.writeBodyMap;
+
+/**
+ * ActiveMQ Artemis implementation of a JMS MapMessage.
+ */
+public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage {
+ // Constants -----------------------------------------------------
+
+ public static final byte TYPE = Message.MAP_TYPE;
+
+ // Attributes ----------------------------------------------------
+
+ private final TypedProperties map = new TypedProperties();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /*
+ * This constructor is used to construct messages prior to sending
+ */
+ public ServerJMSMapMessage(MessageInternal message, int deliveryCount) {
+ super(message, deliveryCount);
+
+ }
+
+ // MapMessage implementation -------------------------------------
+
+ @Override
+ public void setBoolean(final String name, final boolean value) throws JMSException {
+ map.putBooleanProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setByte(final String name, final byte value) throws JMSException {
+ map.putByteProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setShort(final String name, final short value) throws JMSException {
+ map.putShortProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setChar(final String name, final char value) throws JMSException {
+ map.putCharProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setInt(final String name, final int value) throws JMSException {
+ map.putIntProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setLong(final String name, final long value) throws JMSException {
+ map.putLongProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setFloat(final String name, final float value) throws JMSException {
+ map.putFloatProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setDouble(final String name, final double value) throws JMSException {
+ map.putDoubleProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setString(final String name, final String value) throws JMSException {
+ map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value));
+ }
+
+ @Override
+ public void setBytes(final String name, final byte[] value) throws JMSException {
+ map.putBytesProperty(new SimpleString(name), value);
+ }
+
+ @Override
+ public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException {
+ if (offset + length > value.length) {
+ throw new JMSException("Invalid offset/length");
+ }
+ byte[] newBytes = new byte[length];
+ System.arraycopy(value, offset, newBytes, 0, length);
+ map.putBytesProperty(new SimpleString(name), newBytes);
+ }
+
+ @Override
+ public void setObject(final String name, final Object value) throws JMSException {
+ try {
+ TypedProperties.setObjectProperty(new SimpleString(name), value, map);
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean getBoolean(final String name) throws JMSException {
+ try {
+ return map.getBooleanProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public byte getByte(final String name) throws JMSException {
+ try {
+ return map.getByteProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public short getShort(final String name) throws JMSException {
+ try {
+ return map.getShortProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public char getChar(final String name) throws JMSException {
+ try {
+ return map.getCharProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public int getInt(final String name) throws JMSException {
+ try {
+ return map.getIntProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public long getLong(final String name) throws JMSException {
+ try {
+ return map.getLongProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public float getFloat(final String name) throws JMSException {
+ try {
+ return map.getFloatProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public double getDouble(final String name) throws JMSException {
+ try {
+ return map.getDoubleProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String getString(final String name) throws JMSException {
+ try {
+ SimpleString str = map.getSimpleStringProperty(new SimpleString(name));
+ if (str == null) {
+ return null;
+ }
+ else {
+ return str.toString();
+ }
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public byte[] getBytes(final String name) throws JMSException {
+ try {
+ return map.getBytesProperty(new SimpleString(name));
+ }
+ catch (ActiveMQPropertyConversionException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ }
+
+ @Override
+ public Object getObject(final String name) throws JMSException {
+ Object val = map.getProperty(new SimpleString(name));
+
+ if (val instanceof SimpleString) {
+ val = ((SimpleString) val).toString();
+ }
+
+ return val;
+ }
+
+ @Override
+ public Enumeration getMapNames() throws JMSException {
+ Set<SimpleString> simplePropNames = map.getPropertyNames();
+ Set<String> propNames = new HashSet<>(simplePropNames.size());
+
+ for (SimpleString str : simplePropNames) {
+ propNames.add(str.toString());
+ }
+
+ return Collections.enumeration(propNames);
+ }
+
+ @Override
+ public boolean itemExists(final String name) throws JMSException {
+ return map.containsProperty(new SimpleString(name));
+ }
+
+ @Override
+ public void clearBody() throws JMSException {
+ super.clearBody();
+
+ map.clear();
+ }
+
+ @Override
+ public void encode() throws Exception {
+ super.encode();
+ writeBodyMap(getWriteBodyBuffer(), map);
+ }
+
+ @Override
+ public void decode() throws Exception {
+ super.decode();
+ readBodyMap(getReadBodyBuffer(), map);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
new file mode 100644
index 0000000..d15d22b
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSMessage.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import java.util.Collections;
+import java.util.Enumeration;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.reader.MessageUtil;
+
+public class ServerJMSMessage implements Message {
+
+ public static final String NATIVE_MESSAGE_ID = "NATIVE_MESSAGE_ID";
+
+ protected final MessageInternal message;
+
+ protected int deliveryCount;
+
+ public MessageInternal getInnerMessage() {
+ return message;
+ }
+
+ public ServerJMSMessage(MessageInternal message, int deliveryCount) {
+ this.message = message;
+ this.deliveryCount = deliveryCount;
+ }
+
+ private ActiveMQBuffer readBodyBuffer;
+
+ /** When reading we use a protected copy so multi-threads can work fine */
+ protected ActiveMQBuffer getReadBodyBuffer() {
+ if (readBodyBuffer == null) {
+ // to avoid clashes between multiple threads
+ readBodyBuffer = message.getBodyBufferDuplicate();
+ }
+ return readBodyBuffer;
+ }
+
+ /** When writing on the conversion we use the buffer directly */
+ protected ActiveMQBuffer getWriteBodyBuffer() {
+ readBodyBuffer = null; // it invalidates this buffer if anything is written
+ return message.getBodyBuffer();
+ }
+
+
+ @Override
+ public final String getJMSMessageID() throws JMSException {
+ if (message.containsProperty(NATIVE_MESSAGE_ID)) {
+ return getStringProperty(NATIVE_MESSAGE_ID);
+ }
+ return null;
+ }
+
+ @Override
+ public final void setJMSMessageID(String id) throws JMSException {
+ if (id != null) {
+ message.putStringProperty(NATIVE_MESSAGE_ID, id);
+ }
+ }
+
+ @Override
+ public final long getJMSTimestamp() throws JMSException {
+ return message.getTimestamp();
+ }
+
+ @Override
+ public final void setJMSTimestamp(long timestamp) throws JMSException {
+ message.setTimestamp(timestamp);
+ }
+
+ @Override
+ public final byte[] getJMSCorrelationIDAsBytes() throws JMSException {
+ return MessageUtil.getJMSCorrelationIDAsBytes(message);
+ }
+
+ @Override
+ public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException {
+ try {
+ MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID);
+ }
+ catch (ActiveMQException e) {
+ throw new JMSException(e.getMessage());
+ }
+ }
+
+ @Override
+ public final void setJMSCorrelationID(String correlationID) throws JMSException {
+ MessageUtil.setJMSCorrelationID(message, correlationID);
+ }
+
+ @Override
+ public final String getJMSCorrelationID() throws JMSException {
+ return MessageUtil.getJMSCorrelationID(message);
+ }
+
+ @Override
+ public final Destination getJMSReplyTo() throws JMSException {
+ SimpleString reply = MessageUtil.getJMSReplyTo(message);
+ if (reply != null) {
+ return new ServerDestination(reply.toString());
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public final void setJMSReplyTo(Destination replyTo) throws JMSException {
+ MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((ActiveMQDestination) replyTo).getSimpleAddress());
+
+ }
+
+ @Override
+ public final Destination getJMSDestination() throws JMSException {
+ SimpleString sdest = message.getAddress();
+
+ if (sdest == null) {
+ return null;
+ }
+ else {
+ return new ServerDestination(sdest.toString());
+ }
+ }
+
+ @Override
+ public final void setJMSDestination(Destination destination) throws JMSException {
+ if (destination == null) {
+ message.setAddress(null);
+ }
+ else {
+ message.setAddress(((ActiveMQDestination) destination).getSimpleAddress());
+ }
+
+ }
+
+ @Override
+ public final int getJMSDeliveryMode() throws JMSException {
+ return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+ }
+
+ @Override
+ public final void setJMSDeliveryMode(int deliveryMode) throws JMSException {
+ if (deliveryMode == DeliveryMode.PERSISTENT) {
+ message.setDurable(true);
+ }
+ else if (deliveryMode == DeliveryMode.NON_PERSISTENT) {
+ message.setDurable(false);
+ }
+ else {
+ throw new JMSException("Invalid mode " + deliveryMode);
+ }
+ }
+
+ @Override
+ public final boolean getJMSRedelivered() throws JMSException {
+ return false;
+ }
+
+ @Override
+ public final void setJMSRedelivered(boolean redelivered) throws JMSException {
+ // no op
+ }
+
+ @Override
+ public final String getJMSType() throws JMSException {
+ return MessageUtil.getJMSType(message);
+ }
+
+ @Override
+ public final void setJMSType(String type) throws JMSException {
+ MessageUtil.setJMSType(message, type);
+ }
+
+ @Override
+ public final long getJMSExpiration() throws JMSException {
+ return message.getExpiration();
+ }
+
+ @Override
+ public final void setJMSExpiration(long expiration) throws JMSException {
+ message.setExpiration(expiration);
+ }
+
+ @Override
+ public final long getJMSDeliveryTime() throws JMSException {
+ // no op
+ return 0;
+ }
+
+ @Override
+ public final void setJMSDeliveryTime(long deliveryTime) throws JMSException {
+ // no op
+ }
+
+ @Override
+ public final int getJMSPriority() throws JMSException {
+ return message.getPriority();
+ }
+
+ @Override
+ public final void setJMSPriority(int priority) throws JMSException {
+ message.setPriority((byte) priority);
+ }
+
+ @Override
+ public final void clearProperties() throws JMSException {
+ MessageUtil.clearProperties(message);
+
+ }
+
+ @Override
+ public final boolean propertyExists(String name) throws JMSException {
+ return MessageUtil.propertyExists(message, name);
+ }
+
+ @Override
+ public final boolean getBooleanProperty(String name) throws JMSException {
+ return message.getBooleanProperty(name);
+ }
+
+ @Override
+ public final byte getByteProperty(String name) throws JMSException {
+ return message.getByteProperty(name);
+ }
+
+ @Override
+ public final short getShortProperty(String name) throws JMSException {
+ return message.getShortProperty(name);
+ }
+
+ @Override
+ public final int getIntProperty(String name) throws JMSException {
+ if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+ return deliveryCount;
+ }
+
+ return message.getIntProperty(name);
+ }
+
+ @Override
+ public final long getLongProperty(String name) throws JMSException {
+ if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+ return deliveryCount;
+ }
+
+ return message.getLongProperty(name);
+ }
+
+ @Override
+ public final float getFloatProperty(String name) throws JMSException {
+ return message.getFloatProperty(name);
+ }
+
+ @Override
+ public final double getDoubleProperty(String name) throws JMSException {
+ return message.getDoubleProperty(name);
+ }
+
+ @Override
+ public final String getStringProperty(String name) throws JMSException {
+ if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) {
+ return String.valueOf(deliveryCount);
+ }
+
+ return message.getStringProperty(name);
+ }
+
+ @Override
+ public final Object getObjectProperty(String name) throws JMSException {
+ Object val = message.getObjectProperty(name);
+ if (val instanceof SimpleString) {
+ val = ((SimpleString) val).toString();
+ }
+ return val;
+ }
+
+ @Override
+ public final Enumeration getPropertyNames() throws JMSException {
+ return Collections.enumeration(MessageUtil.getPropertyNames(message));
+ }
+
+ @Override
+ public final void setBooleanProperty(String name, boolean value) throws JMSException {
+ message.putBooleanProperty(name, value);
+ }
+
+ @Override
+ public final void setByteProperty(String name, byte value) throws JMSException {
+ message.putByteProperty(name, value);
+ }
+
+ @Override
+ public final void setShortProperty(String name, short value) throws JMSException {
+ message.putShortProperty(name, value);
+ }
+
+ @Override
+ public final void setIntProperty(String name, int value) throws JMSException {
+ message.putIntProperty(name, value);
+ }
+
+ @Override
+ public final void setLongProperty(String name, long value) throws JMSException {
+ message.putLongProperty(name, value);
+ }
+
+ @Override
+ public final void setFloatProperty(String name, float value) throws JMSException {
+ message.putFloatProperty(name, value);
+ }
+
+ @Override
+ public final void setDoubleProperty(String name, double value) throws JMSException {
+ message.putDoubleProperty(name, value);
+ }
+
+ @Override
+ public final void setStringProperty(String name, String value) throws JMSException {
+ message.putStringProperty(name, value);
+ }
+
+ @Override
+ public final void setObjectProperty(String name, Object value) throws JMSException {
+ message.putObjectProperty(name, value);
+ }
+
+ @Override
+ public final void acknowledge() throws JMSException {
+ // no op
+ }
+
+ @Override
+ public void clearBody() throws JMSException {
+ message.getBodyBuffer().clear();
+ }
+
+ @Override
+ public final <T> T getBody(Class<T> c) throws JMSException {
+ // no op.. jms2 not used on the conversion
+ return null;
+ }
+
+ /**
+ * Encode the body into the internal message
+ */
+ public void encode() throws Exception {
+ message.getBodyBuffer().resetReaderIndex();
+ }
+
+ public void decode() throws Exception {
+ message.getBodyBuffer().resetReaderIndex();
+ }
+
+ @Override
+ public final boolean isBodyAssignableTo(Class c) throws JMSException {
+ // no op.. jms2 not used on the conversion
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
new file mode 100644
index 0000000..39e0df5
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSObjectMessage.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.utils.ObjectInputStreamWithClassLoader;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+
+public class ServerJMSObjectMessage extends ServerJMSMessage implements ObjectMessage {
+ private static final String DEFAULT_WHITELIST;
+ private static final String DEFAULT_BLACKLIST;
+
+ static {
+ DEFAULT_WHITELIST = System.getProperty(ObjectInputStreamWithClassLoader.WHITELIST_PROPERTY,
+ "java.lang,java.math,javax.security,java.util,org.apache.activemq,org.apache.qpid.proton.amqp");
+
+ DEFAULT_BLACKLIST = System.getProperty(ObjectInputStreamWithClassLoader.BLACKLIST_PROPERTY, null);
+ }
+ public static final byte TYPE = Message.STREAM_TYPE;
+
+ private Serializable object;
+
+ public ServerJMSObjectMessage(MessageInternal message, int deliveryCount) {
+ super(message, deliveryCount);
+ }
+
+ @Override
+ public void setObject(Serializable object) throws JMSException {
+ this.object = object;
+ }
+
+ @Override
+ public Serializable getObject() throws JMSException {
+ return object;
+ }
+
+ @Override
+ public void encode() throws Exception {
+ super.encode();
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ ObjectOutputStream ous = new ObjectOutputStream(out);
+ ous.writeObject(object);
+ getInnerMessage().getBodyBuffer().writeBytes(out.toByteArray());
+ }
+
+ @Override
+ public void decode() throws Exception {
+ super.decode();
+ int size = getInnerMessage().getBodyBuffer().readableBytes();
+ byte[] bytes = new byte[size];
+ getInnerMessage().getBodyBuffer().readBytes(bytes);
+ ObjectInputStreamWithClassLoader ois = new ObjectInputStreamWithClassLoader(new ByteArrayInputStream(bytes));
+ ois.setWhiteList(DEFAULT_WHITELIST);
+ ois.setBlackList(DEFAULT_BLACKLIST);
+ object = (Serializable) ois.readObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
new file mode 100644
index 0000000..c63b701
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSStreamMessage.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.StreamMessage;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.utils.DataConstants;
+
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBoolean;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadByte;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadBytes;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadChar;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadDouble;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadFloat;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadInteger;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadLong;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadObject;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadShort;
+import static org.apache.activemq.artemis.reader.StreamMessageUtil.streamReadString;
+
+public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage {
+
+ public static final byte TYPE = Message.STREAM_TYPE;
+
+ private int bodyLength = 0;
+
+ public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) {
+ super(message, deliveryCount);
+ }
+
+ // StreamMessage implementation ----------------------------------
+
+ @Override
+ public boolean readBoolean() throws JMSException {
+
+ try {
+ return streamReadBoolean(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public byte readByte() throws JMSException {
+ try {
+ return streamReadByte(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public short readShort() throws JMSException {
+
+ try {
+ return streamReadShort(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public char readChar() throws JMSException {
+
+ try {
+ return streamReadChar(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public int readInt() throws JMSException {
+
+ try {
+ return streamReadInteger(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public long readLong() throws JMSException {
+
+ try {
+ return streamReadLong(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public float readFloat() throws JMSException {
+
+ try {
+ return streamReadFloat(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public double readDouble() throws JMSException {
+
+ try {
+ return streamReadDouble(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public String readString() throws JMSException {
+
+ try {
+ return streamReadString(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ /**
+ * len here is used to control how many more bytes to read
+ */
+ private int len = 0;
+
+ @Override
+ public int readBytes(final byte[] value) throws JMSException {
+
+ try {
+ Pair<Integer, Integer> pairRead = streamReadBytes(getReadBodyBuffer(), len, value);
+
+ len = pairRead.getA();
+ return pairRead.getB();
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public Object readObject() throws JMSException {
+
+ if (getReadBodyBuffer().readerIndex() >= message.getEndOfBodyPosition()) {
+ throw new MessageEOFException("");
+ }
+ try {
+ return streamReadObject(getReadBodyBuffer());
+ }
+ catch (IllegalStateException e) {
+ throw new MessageFormatException(e.getMessage());
+ }
+ catch (IndexOutOfBoundsException e) {
+ throw new MessageEOFException("");
+ }
+ }
+
+ @Override
+ public void writeBoolean(final boolean value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.BOOLEAN);
+ getWriteBodyBuffer().writeBoolean(value);
+ }
+
+ @Override
+ public void writeByte(final byte value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.BYTE);
+ getWriteBodyBuffer().writeByte(value);
+ }
+
+ @Override
+ public void writeShort(final short value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.SHORT);
+ getWriteBodyBuffer().writeShort(value);
+ }
+
+ @Override
+ public void writeChar(final char value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.CHAR);
+ getWriteBodyBuffer().writeShort((short) value);
+ }
+
+ @Override
+ public void writeInt(final int value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.INT);
+ getWriteBodyBuffer().writeInt(value);
+ }
+
+ @Override
+ public void writeLong(final long value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.LONG);
+ getWriteBodyBuffer().writeLong(value);
+ }
+
+ @Override
+ public void writeFloat(final float value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.FLOAT);
+ getWriteBodyBuffer().writeInt(Float.floatToIntBits(value));
+ }
+
+ @Override
+ public void writeDouble(final double value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.DOUBLE);
+ getWriteBodyBuffer().writeLong(Double.doubleToLongBits(value));
+ }
+
+ @Override
+ public void writeString(final String value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.STRING);
+ getWriteBodyBuffer().writeNullableString(value);
+ }
+
+ @Override
+ public void writeBytes(final byte[] value) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+ getWriteBodyBuffer().writeInt(value.length);
+ getWriteBodyBuffer().writeBytes(value);
+ }
+
+ @Override
+ public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException {
+
+ getWriteBodyBuffer().writeByte(DataConstants.BYTES);
+ getWriteBodyBuffer().writeInt(length);
+ getWriteBodyBuffer().writeBytes(value, offset, length);
+ }
+
+ @Override
+ public void writeObject(final Object value) throws JMSException {
+ if (value instanceof String) {
+ writeString((String) value);
+ }
+ else if (value instanceof Boolean) {
+ writeBoolean((Boolean) value);
+ }
+ else if (value instanceof Byte) {
+ writeByte((Byte) value);
+ }
+ else if (value instanceof Short) {
+ writeShort((Short) value);
+ }
+ else if (value instanceof Integer) {
+ writeInt((Integer) value);
+ }
+ else if (value instanceof Long) {
+ writeLong((Long) value);
+ }
+ else if (value instanceof Float) {
+ writeFloat((Float) value);
+ }
+ else if (value instanceof Double) {
+ writeDouble((Double) value);
+ }
+ else if (value instanceof byte[]) {
+ writeBytes((byte[]) value);
+ }
+ else if (value instanceof Character) {
+ writeChar((Character) value);
+ }
+ else if (value == null) {
+ writeString(null);
+ }
+ else {
+ throw new MessageFormatException("Invalid object type: " + value.getClass());
+ }
+ }
+
+ @Override
+ public void reset() throws JMSException {
+ getWriteBodyBuffer().resetReaderIndex();
+ }
+
+ // ActiveMQRAMessage overrides ----------------------------------------
+
+ @Override
+ public void clearBody() throws JMSException {
+ super.clearBody();
+
+ getWriteBodyBuffer().clear();
+ }
+
+ @Override
+ public void decode() throws Exception {
+ super.decode();
+ }
+
+ /**
+ * Encode the body into the internal message
+ */
+ @Override
+ public void encode() throws Exception {
+ super.encode();
+ bodyLength = message.getEndOfBodyPosition();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
new file mode 100644
index 0000000..5178dc2
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/jms/ServerJMSTextMessage.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+
+import static org.apache.activemq.artemis.reader.TextMessageUtil.readBodyText;
+import static org.apache.activemq.artemis.reader.TextMessageUtil.writeBodyText;
+
+/**
+ * ActiveMQ Artemis implementation of a JMS TextMessage.
+ * <br>
+ * This class was ported from SpyTextMessage in JBossMQ.
+ */
+public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage {
+ // Constants -----------------------------------------------------
+
+ public static final byte TYPE = Message.TEXT_TYPE;
+
+ // Attributes ----------------------------------------------------
+
+ // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write
+ // methods are more efficient for a SimpleString
+ private SimpleString text;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /*
+ * This constructor is used to construct messages prior to sending
+ */
+ public ServerJMSTextMessage(MessageInternal message, int deliveryCount) {
+ super(message, deliveryCount);
+
+ }
+ // TextMessage implementation ------------------------------------
+
+ @Override
+ public void setText(final String text) throws JMSException {
+ if (text != null) {
+ this.text = new SimpleString(text);
+ }
+ else {
+ this.text = null;
+ }
+
+ writeBodyText(getWriteBodyBuffer(), this.text);
+ }
+
+ @Override
+ public String getText() {
+ if (text != null) {
+ return text.toString();
+ }
+ else {
+ return null;
+ }
+ }
+
+ @Override
+ public void clearBody() throws JMSException {
+ super.clearBody();
+
+ text = null;
+ }
+
+ @Override
+ public void encode() throws Exception {
+ super.encode();
+ writeBodyText(getWriteBodyBuffer(), text);
+ }
+
+ @Override
+ public void decode() throws Exception {
+ super.decode();
+ text = readBodyText(getReadBodyBuffer());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
new file mode 100644
index 0000000..9e172fa
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageIdHelper.java
@@ -0,0 +1,257 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.UnsignedLong;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * Helper class for identifying and converting message-id and correlation-id values between
+ * the AMQP types and the Strings values used by JMS.
+ * <p>
+ * <p>AMQP messages allow for 4 types of message-id/correlation-id: message-id-string, message-id-binary,
+ * message-id-uuid, or message-id-ulong. In order to accept or return a string representation of these
+ * for interoperability with other AMQP clients, the following encoding can be used after removing or
+ * before adding the "ID:" prefix used for a JMSMessageID value:<br>
+ * <p>
+ * {@literal "AMQP_BINARY:<hex representation of binary content>"}<br>
+ * {@literal "AMQP_UUID:<string representation of uuid>"}<br>
+ * {@literal "AMQP_ULONG:<string representation of ulong>"}<br>
+ * {@literal "AMQP_STRING:<string>"}<br>
+ * <p>
+ * <p>The AMQP_STRING encoding exists only for escaping message-id-string values that happen to begin
+ * with one of the encoding prefixes (including AMQP_STRING itself). It MUST NOT be used otherwise.
+ * <p>
+ * <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
+ * ulong but can't be converted into the indicated format, an exception will be thrown.
+ */
+public class AMQPMessageIdHelper {
+
+ public static final AMQPMessageIdHelper INSTANCE = new AMQPMessageIdHelper();
+
+ public static final String AMQP_STRING_PREFIX = "AMQP_STRING:";
+ public static final String AMQP_UUID_PREFIX = "AMQP_UUID:";
+ public static final String AMQP_ULONG_PREFIX = "AMQP_ULONG:";
+ public static final String AMQP_BINARY_PREFIX = "AMQP_BINARY:";
+
+ private static final int AMQP_UUID_PREFIX_LENGTH = AMQP_UUID_PREFIX.length();
+ private static final int AMQP_ULONG_PREFIX_LENGTH = AMQP_ULONG_PREFIX.length();
+ private static final int AMQP_STRING_PREFIX_LENGTH = AMQP_STRING_PREFIX.length();
+ private static final int AMQP_BINARY_PREFIX_LENGTH = AMQP_BINARY_PREFIX.length();
+ private static final char[] HEX_CHARS = "0123456789ABCDEF".toCharArray();
+
+ /**
+ * Takes the provided AMQP messageId style object, and convert it to a base string.
+ * Encodes type information as a prefix where necessary to convey or escape the type
+ * of the provided object.
+ *
+ * @param messageId the raw messageId object to process
+ * @return the base string to be used in creating the actual id.
+ */
+ public String toBaseMessageIdString(Object messageId) {
+ if (messageId == null) {
+ return null;
+ }
+ else if (messageId instanceof String) {
+ String stringId = (String) messageId;
+
+ // If the given string has a type encoding prefix,
+ // we need to escape it as an encoded string (even if
+ // the existing encoding prefix was also for string)
+ if (hasTypeEncodingPrefix(stringId)) {
+ return AMQP_STRING_PREFIX + stringId;
+ }
+ else {
+ return stringId;
+ }
+ }
+ else if (messageId instanceof UUID) {
+ return AMQP_UUID_PREFIX + messageId.toString();
+ }
+ else if (messageId instanceof UnsignedLong) {
+ return AMQP_ULONG_PREFIX + messageId.toString();
+ }
+ else if (messageId instanceof Binary) {
+ ByteBuffer dup = ((Binary) messageId).asByteBuffer();
+
+ byte[] bytes = new byte[dup.remaining()];
+ dup.get(bytes);
+
+ String hex = convertBinaryToHexString(bytes);
+
+ return AMQP_BINARY_PREFIX + hex;
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type provided: " + messageId.getClass());
+ }
+ }
+
+ /**
+ * Takes the provided base id string and return the appropriate amqp messageId style object.
+ * Converts the type based on any relevant encoding information found as a prefix.
+ *
+ * @param baseId the object to be converted to an AMQP MessageId value.
+ * @return the AMQP messageId style object
+ * @throws ActiveMQAMQPIllegalStateException if the provided baseId String indicates an encoded type but can't be converted to that type.
+ */
+ public Object toIdObject(String baseId) throws ActiveMQAMQPIllegalStateException {
+ if (baseId == null) {
+ return null;
+ }
+
+ try {
+ if (hasAmqpUuidPrefix(baseId)) {
+ String uuidString = strip(baseId, AMQP_UUID_PREFIX_LENGTH);
+ return UUID.fromString(uuidString);
+ }
+ else if (hasAmqpUlongPrefix(baseId)) {
+ String longString = strip(baseId, AMQP_ULONG_PREFIX_LENGTH);
+ return UnsignedLong.valueOf(longString);
+ }
+ else if (hasAmqpStringPrefix(baseId)) {
+ return strip(baseId, AMQP_STRING_PREFIX_LENGTH);
+ }
+ else if (hasAmqpBinaryPrefix(baseId)) {
+ String hexString = strip(baseId, AMQP_BINARY_PREFIX_LENGTH);
+ byte[] bytes = convertHexStringToBinary(hexString);
+ return new Binary(bytes);
+ }
+ else {
+ // We have a string without any type prefix, transmit it as-is.
+ return baseId;
+ }
+ }
+ catch (IllegalArgumentException e) {
+ throw new ActiveMQAMQPIllegalStateException("Unable to convert ID value");
+ }
+ }
+
+ /**
+ * Convert the provided hex-string into a binary representation where each byte represents
+ * two characters of the hex string.
+ * <p>
+ * The hex characters may be upper or lower case.
+ *
+ * @param hexString string to convert to a binary value.
+ * @return a byte array containing the binary representation
+ * @throws IllegalArgumentException if the provided String is a non-even length or contains
+ * non-hex characters
+ */
+ public byte[] convertHexStringToBinary(String hexString) throws IllegalArgumentException {
+ int length = hexString.length();
+
+ // As each byte needs two characters in the hex encoding, the string must be an even length.
+ if (length % 2 != 0) {
+ throw new IllegalArgumentException("The provided hex String must be an even length, but was of length " + length + ": " + hexString);
+ }
+
+ byte[] binary = new byte[length / 2];
+
+ for (int i = 0; i < length; i += 2) {
+ char highBitsChar = hexString.charAt(i);
+ char lowBitsChar = hexString.charAt(i + 1);
+
+ int highBits = hexCharToInt(highBitsChar, hexString) << 4;
+ int lowBits = hexCharToInt(lowBitsChar, hexString);
+
+ binary[i / 2] = (byte) (highBits + lowBits);
+ }
+
+ return binary;
+ }
+
+ /**
+ * Convert the provided binary into a hex-string representation where each character
+ * represents 4 bits of the provided binary, i.e each byte requires two characters.
+ * <p>
+ * The returned hex characters are upper-case.
+ *
+ * @param bytes the binary value to convert to a hex String instance.
+ * @return a String containing a hex representation of the bytes
+ */
+ public String convertBinaryToHexString(byte[] bytes) {
+ // Each byte is represented as 2 chars
+ StringBuilder builder = new StringBuilder(bytes.length * 2);
+
+ for (byte b : bytes) {
+ // The byte will be expanded to int before shifting, replicating the
+ // sign bit, so mask everything beyond the first 4 bits afterwards
+ int highBitsInt = (b >> 4) & 0xF;
+ // We only want the first 4 bits
+ int lowBitsInt = b & 0xF;
+
+ builder.append(HEX_CHARS[highBitsInt]);
+ builder.append(HEX_CHARS[lowBitsInt]);
+ }
+
+ return builder.toString();
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ private boolean hasTypeEncodingPrefix(String stringId) {
+ return hasAmqpBinaryPrefix(stringId) || hasAmqpUuidPrefix(stringId) ||
+ hasAmqpUlongPrefix(stringId) || hasAmqpStringPrefix(stringId);
+ }
+
+ private boolean hasAmqpStringPrefix(String stringId) {
+ return stringId.startsWith(AMQP_STRING_PREFIX);
+ }
+
+ private boolean hasAmqpUlongPrefix(String stringId) {
+ return stringId.startsWith(AMQP_ULONG_PREFIX);
+ }
+
+ private boolean hasAmqpUuidPrefix(String stringId) {
+ return stringId.startsWith(AMQP_UUID_PREFIX);
+ }
+
+ private boolean hasAmqpBinaryPrefix(String stringId) {
+ return stringId.startsWith(AMQP_BINARY_PREFIX);
+ }
+
+ private String strip(String id, int numChars) {
+ return id.substring(numChars);
+ }
+
+ private int hexCharToInt(char ch, String orig) throws IllegalArgumentException {
+ if (ch >= '0' && ch <= '9') {
+ // subtract '0' to get difference in position as an int
+ return ch - '0';
+ }
+ else if (ch >= 'A' && ch <= 'F') {
+ // subtract 'A' to get difference in position as an int
+ // and then add 10 for the offset of 'A'
+ return ch - 'A' + 10;
+ }
+ else if (ch >= 'a' && ch <= 'f') {
+ // subtract 'a' to get difference in position as an int
+ // and then add 10 for the offset of 'a'
+ return ch - 'a' + 10;
+ }
+
+ throw new IllegalArgumentException("The provided hex string contains non-hex character '" + ch + "': " + orig);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
new file mode 100644
index 0000000..613de6d
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+public class AMQPMessageTypes {
+ public static final String AMQP_TYPE_KEY = "amqp:type";
+
+ public static final String AMQP_SEQUENCE = "amqp:sequence";
+
+ public static final String AMQP_LIST = "amqp:list";
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
new file mode 100644
index 0000000..8a5d17c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import javax.jms.Message;
+
+public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
+
+ public AMQPNativeInboundTransformer(JMSVendor vendor) {
+ super(vendor);
+ }
+
+ @Override
+ public String getTransformerName() {
+ return TRANSFORMER_NATIVE;
+ }
+
+ @Override
+ public InboundTransformer getFallbackTransformer() {
+ return new AMQPRawInboundTransformer(getVendor());
+ }
+
+ @Override
+ public Message transform(EncodedMessage amqpMessage) throws Exception {
+ org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
+
+ Message rc = super.transform(amqpMessage);
+
+ populateMessage(rc, amqp);
+ return rc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
new file mode 100644
index 0000000..67175ab
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.message.ProtonJMessage;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+public class AMQPNativeOutboundTransformer extends OutboundTransformer {
+
+ public AMQPNativeOutboundTransformer(JMSVendor vendor) {
+ super(vendor);
+ }
+
+ public static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException {
+ byte[] data = new byte[(int) msg.getBodyLength()];
+ msg.readBytes(data);
+ msg.reset();
+ int count = msg.getIntProperty("JMSXDeliveryCount");
+
+ // decode...
+ ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
+ int offset = 0;
+ int len = data.length;
+ while (len > 0) {
+ final int decoded = amqp.decode(data, offset, len);
+ assert decoded > 0 : "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+
+ // Update the DeliveryCount header...
+ // The AMQP delivery-count field only includes prior failed delivery attempts,
+ // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+ if (amqp.getHeader() == null) {
+ amqp.setHeader(new Header());
+ }
+
+ amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
+
+ return amqp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
new file mode 100644
index 0000000..e6bf171
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+
+public class AMQPRawInboundTransformer extends InboundTransformer {
+
+ public AMQPRawInboundTransformer(JMSVendor vendor) {
+ super(vendor);
+ }
+
+ @Override
+ public String getTransformerName() {
+ return TRANSFORMER_RAW;
+ }
+
+ @Override
+ public InboundTransformer getFallbackTransformer() {
+ return null; // No fallback from full raw transform
+ }
+
+ @Override
+ public Message transform(EncodedMessage amqpMessage) throws Exception {
+ BytesMessage rc = vendor.createBytesMessage();
+ rc.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
+
+ // We cannot decode the message headers to check so err on the side of caution
+ // and mark all messages as persistent.
+ rc.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ rc.setJMSPriority(defaultPriority);
+
+ final long now = System.currentTimeMillis();
+ rc.setJMSTimestamp(now);
+ if (defaultTtl > 0) {
+ rc.setJMSExpiration(now + defaultTtl);
+ }
+
+ rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", amqpMessage.getMessageFormat());
+ rc.setBooleanProperty(prefixVendor + "NATIVE", true);
+
+ return rc;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a838bf04/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
new file mode 100644
index 0000000..4a80ea6
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.message.Message;
+
+public class EncodedMessage {
+
+ private final Binary data;
+ final long messageFormat;
+
+ public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
+ this.data = new Binary(data, offset, length);
+ this.messageFormat = messageFormat;
+ }
+
+ public long getMessageFormat() {
+ return messageFormat;
+ }
+
+ public Message decode() throws Exception {
+ Message amqp = Message.Factory.create();
+
+ int offset = getArrayOffset();
+ int len = getLength();
+ while (len > 0) {
+ final int decoded = amqp.decode(getArray(), offset, len);
+ assert decoded > 0 : "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+
+ return amqp;
+ }
+
+ public int getLength() {
+ return data.getLength();
+ }
+
+ public int getArrayOffset() {
+ return data.getArrayOffset();
+ }
+
+ public byte[] getArray() {
+ return data.getArray();
+ }
+
+ @Override
+ public String toString() {
+ return data.toString();
+ }
+}