You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/06 03:58:33 UTC
[15/21] activemq-artemis git commit: ARTEMIS-1009 Pure Message
Encoding.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
deleted file mode 100644
index 8c4612d..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageSupport.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import static org.apache.activemq.artemis.api.core.Message.BYTES_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.MAP_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.STREAM_TYPE;
-import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
-
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-
-import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidContentTypeException;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Data;
-import org.apache.qpid.proton.message.Message;
-
-/**
- * Support class containing constant values and static methods that are used to map to / from
- * AMQP Message types being sent or received.
- */
-public final class AMQPMessageSupport {
-
- // Message Properties used to map AMQP to JMS and back
-
- public static final String JMS_AMQP_PREFIX = "JMS_AMQP_";
- public static final int JMS_AMQP_PREFIX_LENGTH = JMS_AMQP_PREFIX.length();
-
- public static final String MESSAGE_FORMAT = "MESSAGE_FORMAT";
- public static final String ORIGINAL_ENCODING = "ORIGINAL_ENCODING";
- public static final String NATIVE = "NATIVE";
- public static final String HEADER = "HEADER";
- public static final String PROPERTIES = "PROPERTIES";
-
- public static final String FIRST_ACQUIRER = "FirstAcquirer";
- public static final String CONTENT_TYPE = "ContentType";
- public static final String CONTENT_ENCODING = "ContentEncoding";
- public static final String REPLYTO_GROUP_ID = "ReplyToGroupID";
- public static final String DURABLE = "DURABLE";
- public static final String PRIORITY = "PRIORITY";
-
- public static final String DELIVERY_ANNOTATION_PREFIX = "DA_";
- public static final String MESSAGE_ANNOTATION_PREFIX = "MA_";
- public static final String FOOTER_PREFIX = "FT_";
-
- public static final String JMS_AMQP_HEADER = JMS_AMQP_PREFIX + HEADER;
- public static final String JMS_AMQP_HEADER_DURABLE = JMS_AMQP_PREFIX + HEADER + DURABLE;
- public static final String JMS_AMQP_HEADER_PRIORITY = JMS_AMQP_PREFIX + HEADER + PRIORITY;
- public static final String JMS_AMQP_PROPERTIES = JMS_AMQP_PREFIX + PROPERTIES;
- public static final String JMS_AMQP_ORIGINAL_ENCODING = JMS_AMQP_PREFIX + ORIGINAL_ENCODING;
- public static final String JMS_AMQP_MESSAGE_FORMAT = JMS_AMQP_PREFIX + MESSAGE_FORMAT;
- public static final String JMS_AMQP_NATIVE = JMS_AMQP_PREFIX + NATIVE;
- public static final String JMS_AMQP_FIRST_ACQUIRER = JMS_AMQP_PREFIX + FIRST_ACQUIRER;
- public static final String JMS_AMQP_CONTENT_TYPE = JMS_AMQP_PREFIX + CONTENT_TYPE;
- public static final String JMS_AMQP_CONTENT_ENCODING = JMS_AMQP_PREFIX + CONTENT_ENCODING;
- public static final String JMS_AMQP_REPLYTO_GROUP_ID = JMS_AMQP_PREFIX + REPLYTO_GROUP_ID;
- public static final String JMS_AMQP_DELIVERY_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + DELIVERY_ANNOTATION_PREFIX;
- public static final String JMS_AMQP_MESSAGE_ANNOTATION_PREFIX = JMS_AMQP_PREFIX + MESSAGE_ANNOTATION_PREFIX;
- public static final String JMS_AMQP_FOOTER_PREFIX = JMS_AMQP_PREFIX + FOOTER_PREFIX;
-
- // Message body type definitions
- public static final Binary EMPTY_BINARY = new Binary(new byte[0]);
- public static final Data EMPTY_BODY = new Data(EMPTY_BINARY);
-
- public static final short AMQP_UNKNOWN = 0;
- public static final short AMQP_NULL = 1;
- public static final short AMQP_DATA = 2;
- public static final short AMQP_SEQUENCE = 3;
- public static final short AMQP_VALUE_NULL = 4;
- public static final short AMQP_VALUE_STRING = 5;
- public static final short AMQP_VALUE_BINARY = 6;
- public static final short AMQP_VALUE_MAP = 7;
- public static final short AMQP_VALUE_LIST = 8;
-
- /**
- * Content type used to mark Data sections as containing a serialized java object.
- */
- public static final String SERIALIZED_JAVA_OBJECT_CONTENT_TYPE = "application/x-java-serialized-object";
-
- /**
- * Content type used to mark Data sections as containing arbitrary bytes.
- */
- public static final String OCTET_STREAM_CONTENT_TYPE = "application/octet-stream";
-
- /**
- * Lookup and return the correct Proton Symbol instance based on the given key.
- *
- * @param key
- * the String value name of the Symbol to locate.
- *
- * @return the Symbol value that matches the given key.
- */
- public static Symbol getSymbol(String key) {
- return Symbol.valueOf(key);
- }
-
- /**
- * Safe way to access message annotations which will check internal structure and either
- * return the annotation if it exists or null if the annotation or any annotations are
- * present.
- *
- * @param key
- * the String key to use to lookup an annotation.
- * @param message
- * the AMQP message object that is being examined.
- *
- * @return the given annotation value or null if not present in the message.
- */
- public static Object getMessageAnnotation(String key, Message message) {
- if (message != null && message.getMessageAnnotations() != null) {
- Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
- return annotations.get(AMQPMessageSupport.getSymbol(key));
- }
-
- return null;
- }
-
- /**
- * Check whether the content-type field of the properties section (if present) in the given
- * message matches the provided string (where null matches if there is no content type
- * present.
- *
- * @param contentType
- * content type string to compare against, or null if none
- * @param message
- * the AMQP message object that is being examined.
- *
- * @return true if content type matches
- */
- public static boolean isContentType(String contentType, Message message) {
- if (contentType == null) {
- return message.getContentType() == null;
- } else {
- return contentType.equals(message.getContentType());
- }
- }
-
- /**
- * @param contentType
- * the contentType of the received message
- * @return the character set to use, or null if not to treat the message as text
- */
- public static Charset getCharsetForTextualContent(String contentType) {
- try {
- return AMQPContentTypeSupport.parseContentTypeForTextualCharset(contentType);
- } catch (ActiveMQAMQPInvalidContentTypeException e) {
- return null;
- }
- }
-
- public static ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) {
- switch (messageType) {
- case STREAM_TYPE:
- return new ServerJMSStreamMessage(wrapped, deliveryCount);
- case BYTES_TYPE:
- return new ServerJMSBytesMessage(wrapped, deliveryCount);
- case MAP_TYPE:
- return new ServerJMSMapMessage(wrapped, deliveryCount);
- case TEXT_TYPE:
- return new ServerJMSTextMessage(wrapped, deliveryCount);
- case OBJECT_TYPE:
- return new ServerJMSObjectMessage(wrapped, deliveryCount);
- default:
- return new ServerJMSMessage(wrapped, deliveryCount);
- }
- }
-
- public static String toAddress(Destination destination) {
- if (destination instanceof ActiveMQDestination) {
- return ((ActiveMQDestination) destination).getAddress();
- }
- return null;
- }
-
- public static ServerJMSBytesMessage createBytesMessage(IDGenerator idGenerator) {
- return new ServerJMSBytesMessage(newMessage(idGenerator, BYTES_TYPE), 0);
- }
-
- public static ServerJMSMessage createBytesMessage(IDGenerator idGenerator, byte[] array, int arrayOffset, int length) throws JMSException {
- ServerJMSBytesMessage message = createBytesMessage(idGenerator);
- message.writeBytes(array, arrayOffset, length);
- return message;
- }
-
- public static ServerJMSStreamMessage createStreamMessage(IDGenerator idGenerator) {
- return new ServerJMSStreamMessage(newMessage(idGenerator, STREAM_TYPE), 0);
- }
-
- public static ServerJMSMessage createMessage(IDGenerator idGenerator) {
- return new ServerJMSMessage(newMessage(idGenerator, DEFAULT_TYPE), 0);
- }
-
- public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator) {
- return new ServerJMSTextMessage(newMessage(idGenerator, TEXT_TYPE), 0);
- }
-
- public static ServerJMSTextMessage createTextMessage(IDGenerator idGenerator, String text) throws JMSException {
- ServerJMSTextMessage message = createTextMessage(idGenerator);
- message.setText(text);
- return message;
- }
-
- public static ServerJMSObjectMessage createObjectMessage(IDGenerator idGenerator) {
- return new ServerJMSObjectMessage(newMessage(idGenerator, OBJECT_TYPE), 0);
- }
-
- public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, Binary serializedForm) throws JMSException {
- ServerJMSObjectMessage message = createObjectMessage(idGenerator);
- message.setSerializedForm(serializedForm);
- return message;
- }
-
- public static ServerJMSMessage createObjectMessage(IDGenerator idGenerator, byte[] array, int offset, int length) throws JMSException {
- ServerJMSObjectMessage message = createObjectMessage(idGenerator);
- message.setSerializedForm(new Binary(array, offset, length));
- return message;
- }
-
- public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator) {
- return new ServerJMSMapMessage(newMessage(idGenerator, MAP_TYPE), 0);
- }
-
- public static ServerJMSMapMessage createMapMessage(IDGenerator idGenerator, Map<String, Object> content) throws JMSException {
- ServerJMSMapMessage message = createMapMessage(idGenerator);
- final Set<Map.Entry<String, Object>> set = content.entrySet();
- for (Map.Entry<String, Object> entry : set) {
- Object value = entry.getValue();
- if (value instanceof Binary) {
- Binary binary = (Binary) value;
- value = Arrays.copyOfRange(binary.getArray(), binary.getArrayOffset(), binary.getLength());
- }
- message.setObject(entry.getKey(), value);
- }
- return message;
- }
-
- private static ServerMessageImpl newMessage(IDGenerator idGenerator, byte messageType) {
- ServerMessageImpl message = new ServerMessageImpl(idGenerator.generateID(), 512);
- message.setType(messageType);
- ((ResetLimitWrappedActiveMQBuffer) message.getBodyBuffer()).setMessage(null);
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
deleted file mode 100644
index 70c755a..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPMessageTypes.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-@Deprecated
-public class AMQPMessageTypes {
-
- // TODO - Remove in future release as these are no longer used by the
- // inbound JMS Transformer.
-
- public static final String AMQP_TYPE_KEY = "amqp:type";
-
- public static final String AMQP_SEQUENCE = "amqp:sequence";
-
- public static final String AMQP_LIST = "amqp:list";
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
deleted file mode 100644
index 7028547..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeInboundTransformer.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-
-public class AMQPNativeInboundTransformer extends AMQPRawInboundTransformer {
-
- public AMQPNativeInboundTransformer(IDGenerator idGenerator) {
- super(idGenerator);
- }
-
- @Override
- public String getTransformerName() {
- return TRANSFORMER_NATIVE;
- }
-
- @Override
- public InboundTransformer getFallbackTransformer() {
- return new AMQPRawInboundTransformer(idGenerator);
- }
-
- @Override
- public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception {
- org.apache.qpid.proton.message.Message amqp = amqpMessage.decode();
-
- return populateMessage(super.transform(amqpMessage), amqp);
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
deleted file mode 100644
index 8e89bb3..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPNativeOutboundTransformer.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import java.io.UnsupportedEncodingException;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.apache.qpid.proton.message.ProtonJMessage;
-
-public class AMQPNativeOutboundTransformer extends OutboundTransformer {
-
- public AMQPNativeOutboundTransformer(IDGenerator idGenerator) {
- super(idGenerator);
- }
-
- @Override
- public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException {
- if (message == null || !(message instanceof ServerJMSBytesMessage)) {
- return 0;
- }
-
- return transform(this, (ServerJMSBytesMessage) message, buffer);
- }
-
- public static long transform(OutboundTransformer options, ServerJMSBytesMessage message, WritableBuffer buffer) throws JMSException {
- byte[] data = new byte[(int) message.getBodyLength()];
- message.readBytes(data);
- message.reset();
-
- // The AMQP delivery-count field only includes prior failed delivery attempts,
- int amqpDeliveryCount = message.getDeliveryCount() - 1;
- if (amqpDeliveryCount >= 1) {
-
- // decode...
- ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
- int offset = 0;
- int len = data.length;
- while (len > 0) {
- final int decoded = amqp.decode(data, offset, len);
- assert decoded > 0 : "Make progress decoding the message";
- offset += decoded;
- len -= decoded;
- }
-
- // Update the DeliveryCount header which might require adding a Header
- if (amqp.getHeader() == null && amqpDeliveryCount > 0) {
- amqp.setHeader(new Header());
- }
-
- amqp.getHeader().setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
-
- amqp.encode(buffer);
- } else {
- buffer.put(data, 0, data.length);
- }
-
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
deleted file mode 100644
index 445eaca..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/AMQPRawInboundTransformer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-
-public class AMQPRawInboundTransformer extends InboundTransformer {
-
- public AMQPRawInboundTransformer(IDGenerator idGenerator) {
- super(idGenerator);
- }
-
- @Override
- public String getTransformerName() {
- return TRANSFORMER_RAW;
- }
-
- @Override
- public InboundTransformer getFallbackTransformer() {
- return null; // No fallback from full raw transform
- }
-
- @Override
- public ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception {
- ServerJMSBytesMessage message = createBytesMessage(idGenerator);
- message.writeBytes(amqpMessage.getArray(), amqpMessage.getArrayOffset(), amqpMessage.getLength());
-
- // We cannot decode the message headers to check so err on the side of caution
- // and mark all messages as persistent.
- message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
- message.setJMSPriority(Message.DEFAULT_PRIORITY);
- message.setJMSTimestamp(System.currentTimeMillis());
-
- message.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, amqpMessage.getMessageFormat());
- message.setBooleanProperty(JMS_AMQP_NATIVE, true);
-
- return message;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
deleted file mode 100644
index 22042da..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/EncodedMessage.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.message.Message;
-
-public class EncodedMessage {
-
- private final Binary data;
- final long messageFormat;
-
- public EncodedMessage(long messageFormat, byte[] data, int offset, int length) {
- this.data = new Binary(data, offset, length);
- this.messageFormat = messageFormat;
- }
-
- public long getMessageFormat() {
- return messageFormat;
- }
-
- public Message decode() throws Exception {
- Message amqp = Message.Factory.create();
-
- int offset = getArrayOffset();
- int len = getLength();
- while (len > 0) {
- final int decoded = amqp.decode(getArray(), offset, len);
- assert decoded > 0 : "Make progress decoding the message";
- offset += decoded;
- len -= decoded;
- }
-
- return amqp;
- }
-
- public int getLength() {
- return data.getLength();
- }
-
- public int getArrayOffset() {
- return data.getArrayOffset();
- }
-
- public byte[] getArray() {
- return data.getArray();
- }
-
- @Override
- public String toString() {
- return data.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
deleted file mode 100644
index 1316ab7..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerDestination;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Decimal128;
-import org.apache.qpid.proton.amqp.Decimal32;
-import org.apache.qpid.proton.amqp.Decimal64;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedByte;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.UnsignedLong;
-import org.apache.qpid.proton.amqp.UnsignedShort;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.Footer;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-
-public abstract class InboundTransformer {
-
- protected IDGenerator idGenerator;
-
- public static final String TRANSFORMER_NATIVE = "native";
- public static final String TRANSFORMER_RAW = "raw";
- public static final String TRANSFORMER_JMS = "jms";
-
- public InboundTransformer(IDGenerator idGenerator) {
- this.idGenerator = idGenerator;
- }
-
- public abstract ServerJMSMessage transform(EncodedMessage amqpMessage) throws Exception;
-
- public abstract String getTransformerName();
-
- public abstract InboundTransformer getFallbackTransformer();
-
- @SuppressWarnings("unchecked")
- protected ServerJMSMessage populateMessage(ServerJMSMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
- Header header = amqp.getHeader();
- if (header != null) {
- jms.setBooleanProperty(JMS_AMQP_HEADER, true);
-
- if (header.getDurable() != null) {
- jms.setBooleanProperty(JMS_AMQP_HEADER_DURABLE, true);
- jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- } else {
- jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
-
- if (header.getPriority() != null) {
- jms.setBooleanProperty(JMS_AMQP_HEADER_PRIORITY, true);
- jms.setJMSPriority(header.getPriority().intValue());
- } else {
- jms.setJMSPriority(Message.DEFAULT_PRIORITY);
- }
-
- if (header.getFirstAcquirer() != null) {
- jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
- }
-
- if (header.getDeliveryCount() != null) {
- // AMQP Delivery Count counts only failed delivers where JMS
- // Delivery Count should include the original delivery in the count.
- jms.setLongProperty("JMSXDeliveryCount", header.getDeliveryCount().longValue() + 1);
- }
- } else {
- jms.setJMSPriority((byte) Message.DEFAULT_PRIORITY);
- jms.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
-
- final MessageAnnotations ma = amqp.getMessageAnnotations();
- if (ma != null) {
- for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
- String key = entry.getKey().toString();
- if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
- long deliveryTime = ((Number) entry.getValue()).longValue();
- jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime);
- } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
- long delay = ((Number) entry.getValue()).longValue();
- if (delay > 0) {
- jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
- }
- }
-
- setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
- }
- }
-
- final ApplicationProperties ap = amqp.getApplicationProperties();
- if (ap != null) {
- for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) ap.getValue().entrySet()) {
- setProperty(jms, entry.getKey().toString(), entry.getValue());
- }
- }
-
- final Properties properties = amqp.getProperties();
- if (properties != null) {
- if (properties.getMessageId() != null) {
- jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
- }
- Binary userId = properties.getUserId();
- if (userId != null) {
- // TODO - Better Way to set this?
- jms.setStringProperty("JMSXUserID", new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
- }
- if (properties.getTo() != null) {
- jms.setJMSDestination(new ServerDestination(properties.getTo()));
- }
- if (properties.getSubject() != null) {
- jms.setJMSType(properties.getSubject());
- }
- if (properties.getReplyTo() != null) {
- jms.setJMSReplyTo(new ServerDestination(properties.getReplyTo()));
- }
- if (properties.getCorrelationId() != null) {
- jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
- }
- if (properties.getContentType() != null) {
- jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
- }
- if (properties.getContentEncoding() != null) {
- jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
- }
- if (properties.getCreationTime() != null) {
- jms.setJMSTimestamp(properties.getCreationTime().getTime());
- }
- if (properties.getGroupId() != null) {
- jms.setStringProperty("_AMQ_GROUP_ID", properties.getGroupId());
- }
- if (properties.getGroupSequence() != null) {
- jms.setIntProperty("JMSXGroupSeq", properties.getGroupSequence().intValue());
- }
- if (properties.getReplyToGroupId() != null) {
- jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
- }
- if (properties.getAbsoluteExpiryTime() != null) {
- jms.setJMSExpiration(properties.getAbsoluteExpiryTime().getTime());
- }
- }
-
- // If the jms expiration has not yet been set...
- if (header != null && jms.getJMSExpiration() == 0) {
- // Then lets try to set it based on the message ttl.
- long ttl = Message.DEFAULT_TIME_TO_LIVE;
- if (header.getTtl() != null) {
- ttl = header.getTtl().longValue();
- }
-
- if (ttl == 0) {
- jms.setJMSExpiration(0);
- } else {
- jms.setJMSExpiration(System.currentTimeMillis() + ttl);
- }
- }
-
- final Footer fp = amqp.getFooter();
- if (fp != null) {
- for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
- }
- }
-
- return jms;
- }
-
- private void setProperty(Message msg, String key, Object value) throws JMSException {
- if (value instanceof UnsignedLong) {
- long v = ((UnsignedLong) value).longValue();
- msg.setLongProperty(key, v);
- } else if (value instanceof UnsignedInteger) {
- long v = ((UnsignedInteger) value).longValue();
- if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) {
- msg.setIntProperty(key, (int) v);
- } else {
- msg.setLongProperty(key, v);
- }
- } else if (value instanceof UnsignedShort) {
- int v = ((UnsignedShort) value).intValue();
- if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) {
- msg.setShortProperty(key, (short) v);
- } else {
- msg.setIntProperty(key, v);
- }
- } else if (value instanceof UnsignedByte) {
- short v = ((UnsignedByte) value).shortValue();
- if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) {
- msg.setByteProperty(key, (byte) v);
- } else {
- msg.setShortProperty(key, v);
- }
- } else if (value instanceof Symbol) {
- msg.setStringProperty(key, value.toString());
- } else if (value instanceof Decimal128) {
- msg.setDoubleProperty(key, ((Decimal128) value).doubleValue());
- } else if (value instanceof Decimal64) {
- msg.setDoubleProperty(key, ((Decimal64) value).doubleValue());
- } else if (value instanceof Decimal32) {
- msg.setFloatProperty(key, ((Decimal32) value).floatValue());
- } else if (value instanceof Binary) {
- msg.setStringProperty(key, value.toString());
- } else {
- msg.setObjectProperty(key, value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
deleted file mode 100644
index 629c499..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_MAP;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createBytesMessage;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMapMessage;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createMessage;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createObjectMessage;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createStreamMessage;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.createTextMessage;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.getCharsetForTextualContent;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.isContentType;
-
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
-import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.Data;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.message.Message;
-
-public class JMSMappingInboundTransformer extends InboundTransformer {
-
- public JMSMappingInboundTransformer(IDGenerator idGenerator) {
- super(idGenerator);
- }
-
- @Override
- public String getTransformerName() {
- return TRANSFORMER_JMS;
- }
-
- @Override
- public InboundTransformer getFallbackTransformer() {
- return new AMQPNativeInboundTransformer(idGenerator);
- }
-
- @Override
- public ServerJMSMessage transform(EncodedMessage encodedMessage) throws Exception {
- ServerJMSMessage transformedMessage = null;
-
- try {
- Message amqpMessage = encodedMessage.decode();
- transformedMessage = createServerMessage(amqpMessage);
- populateMessage(transformedMessage, amqpMessage);
- } catch (Exception ex) {
- InboundTransformer transformer = this.getFallbackTransformer();
-
- while (transformer != null) {
- try {
- transformedMessage = transformer.transform(encodedMessage);
- break;
- } catch (Exception e) {
- transformer = transformer.getFallbackTransformer();
- }
- }
- }
-
- // Regardless of the transformer that finally decoded the message we need to ensure that
- // the AMQP Message Format value is preserved for application on retransmit.
- if (transformedMessage != null && encodedMessage.getMessageFormat() != 0) {
- transformedMessage.setLongProperty(JMS_AMQP_MESSAGE_FORMAT, encodedMessage.getMessageFormat());
- }
-
- return transformedMessage;
- }
-
- @SuppressWarnings("unchecked")
- private ServerJMSMessage createServerMessage(Message message) throws Exception {
-
- Section body = message.getBody();
- ServerJMSMessage result;
-
- if (body == null) {
- if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
- result = createObjectMessage(idGenerator);
- } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message) || isContentType(null, message)) {
- result = createBytesMessage(idGenerator);
- } else {
- Charset charset = getCharsetForTextualContent(message.getContentType());
- if (charset != null) {
- result = createTextMessage(idGenerator);
- } else {
- result = createMessage(idGenerator);
- }
- }
-
- result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
- } else if (body instanceof Data) {
- Binary payload = ((Data) body).getValue();
-
- if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
- result = createObjectMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
- } else if (isContentType(OCTET_STREAM_CONTENT_TYPE, message)) {
- result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
- } else {
- Charset charset = getCharsetForTextualContent(message.getContentType());
- if (StandardCharsets.UTF_8.equals(charset)) {
- ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
-
- try {
- CharBuffer chars = charset.newDecoder().decode(buf);
- result = createTextMessage(idGenerator, String.valueOf(chars));
- } catch (CharacterCodingException e) {
- result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
- }
- } else {
- result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
- }
- }
-
- result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
- } else if (body instanceof AmqpSequence) {
- AmqpSequence sequence = (AmqpSequence) body;
- ServerJMSStreamMessage m = createStreamMessage(idGenerator);
- for (Object item : sequence.getValue()) {
- m.writeObject(item);
- }
-
- result = m;
- result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
- } else if (body instanceof AmqpValue) {
- Object value = ((AmqpValue) body).getValue();
- if (value == null || value instanceof String) {
- result = createTextMessage(idGenerator, (String) value);
-
- result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
- } else if (value instanceof Binary) {
- Binary payload = (Binary) value;
-
- if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE, message)) {
- result = createObjectMessage(idGenerator, payload);
- } else {
- result = createBytesMessage(idGenerator, payload.getArray(), payload.getArrayOffset(), payload.getLength());
- }
-
- result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
- } else if (value instanceof List) {
- ServerJMSStreamMessage m = createStreamMessage(idGenerator);
- for (Object item : (List<Object>) value) {
- m.writeObject(item);
- }
- result = m;
- result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
- } else if (value instanceof Map) {
- result = createMapMessage(idGenerator, (Map<String, Object>) value);
- result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
- } else {
- // Trigger fall-back to native encoder which generates BytesMessage with the
- // original message stored in the message body.
- throw new ActiveMQAMQPInternalErrorException("Unable to encode to ActiveMQ JMS Message");
- }
- } else {
- throw new RuntimeException("Unexpected body type: " + body.getClass());
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
deleted file mode 100644
index 7dbc6d4..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ /dev/null
@@ -1,592 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import static org.apache.activemq.artemis.api.core.FilterConstants.NATIVE_MESSAGE_ID;
-import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_DATA;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_SEQUENCE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_UNKNOWN;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_BINARY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_LIST;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.AMQP_VALUE_STRING;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.EMPTY_BINARY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_ENCODING;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_CONTENT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_DELIVERY_ANNOTATION_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FIRST_ACQUIRER;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_FOOTER_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_DURABLE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_HEADER_PRIORITY;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_ANNOTATION_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_NATIVE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_PROPERTIES;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.JMS_AMQP_REPLYTO_GROUP_ID;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE;
-import static org.apache.activemq.artemis.protocol.amqp.converter.message.AMQPMessageSupport.toAddress;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageEOFException;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.artemis.core.message.impl.MessageInternal;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage;
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage;
-import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
-import org.apache.activemq.artemis.reader.MessageUtil;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.amqp.Binary;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.UnsignedByte;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.Data;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Footer;
-import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Properties;
-import org.apache.qpid.proton.amqp.messaging.Section;
-import org.apache.qpid.proton.codec.AMQPDefinedTypes;
-import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
-import org.apache.qpid.proton.codec.WritableBuffer;
-import org.jboss.logging.Logger;
-
-public class JMSMappingOutboundTransformer extends OutboundTransformer {
-
- private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class);
-
- public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
- public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
-
- public static final byte QUEUE_TYPE = 0x00;
- public static final byte TOPIC_TYPE = 0x01;
- public static final byte TEMP_QUEUE_TYPE = 0x02;
- public static final byte TEMP_TOPIC_TYPE = 0x03;
-
- // For now Proton requires that we create a decoder to create an encoder
- private static class EncoderDecoderPair {
- DecoderImpl decoder = new DecoderImpl();
- EncoderImpl encoder = new EncoderImpl(decoder);
- {
- AMQPDefinedTypes.registerAllTypes(decoder, encoder);
- }
- }
-
- private static final ThreadLocal<EncoderDecoderPair> tlsCodec = new ThreadLocal<EncoderDecoderPair>() {
- @Override
- protected EncoderDecoderPair initialValue() {
- return new EncoderDecoderPair();
- }
- };
-
- public JMSMappingOutboundTransformer(IDGenerator idGenerator) {
- super(idGenerator);
- }
-
- @Override
- public long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException {
- if (message == null) {
- return 0;
- }
-
- long messageFormat = 0;
- Header header = null;
- Properties properties = null;
- Map<Symbol, Object> daMap = null;
- Map<Symbol, Object> maMap = null;
- Map<String, Object> apMap = null;
- Map<Object, Object> footerMap = null;
-
- Section body = convertBody(message);
-
- if (message.getInnerMessage().isDurable()) {
- if (header == null) {
- header = new Header();
- }
- header.setDurable(true);
- }
- byte priority = (byte) message.getJMSPriority();
- if (priority != Message.DEFAULT_PRIORITY) {
- if (header == null) {
- header = new Header();
- }
- header.setPriority(UnsignedByte.valueOf(priority));
- }
- String type = message.getJMSType();
- if (type != null) {
- if (properties == null) {
- properties = new Properties();
- }
- properties.setSubject(type);
- }
- String messageId = message.getJMSMessageID();
- if (messageId != null) {
- if (properties == null) {
- properties = new Properties();
- }
- try {
- properties.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(messageId));
- } catch (ActiveMQAMQPIllegalStateException e) {
- properties.setMessageId(messageId);
- }
- }
- Destination destination = message.getJMSDestination();
- if (destination != null) {
- if (properties == null) {
- properties = new Properties();
- }
- properties.setTo(toAddress(destination));
- if (maMap == null) {
- maMap = new HashMap<>();
- }
- maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
- }
- Destination replyTo = message.getJMSReplyTo();
- if (replyTo != null) {
- if (properties == null) {
- properties = new Properties();
- }
- properties.setReplyTo(toAddress(replyTo));
- if (maMap == null) {
- maMap = new HashMap<>();
- }
- maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
- }
- String correlationId = message.getJMSCorrelationID();
- if (correlationId != null) {
- if (properties == null) {
- properties = new Properties();
- }
- try {
- properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
- } catch (ActiveMQAMQPIllegalStateException e) {
- properties.setCorrelationId(correlationId);
- }
- }
- long expiration = message.getJMSExpiration();
- if (expiration != 0) {
- long ttl = expiration - System.currentTimeMillis();
- if (ttl < 0) {
- ttl = 1;
- }
-
- if (header == null) {
- header = new Header();
- }
- header.setTtl(new UnsignedInteger((int) ttl));
-
- if (properties == null) {
- properties = new Properties();
- }
- properties.setAbsoluteExpiryTime(new Date(expiration));
- }
- long timeStamp = message.getJMSTimestamp();
- if (timeStamp != 0) {
- if (properties == null) {
- properties = new Properties();
- }
- properties.setCreationTime(new Date(timeStamp));
- }
-
- final Set<String> keySet = MessageUtil.getPropertyNames(message.getInnerMessage());
- for (String key : keySet) {
- if (key.startsWith("JMSX")) {
- if (key.equals("JMSXDeliveryCount")) {
- // The AMQP delivery-count field only includes prior failed delivery attempts,
- // whereas JMSXDeliveryCount includes the first/current delivery attempt.
- int amqpDeliveryCount = message.getDeliveryCount() - 1;
- if (amqpDeliveryCount > 0) {
- if (header == null) {
- header = new Header();
- }
- header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
- }
- continue;
- } else if (key.equals("JMSXUserID")) {
- String value = message.getStringProperty(key);
- if (properties == null) {
- properties = new Properties();
- }
- properties.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8)));
- continue;
- } else if (key.equals("JMSXGroupID")) {
- String value = message.getStringProperty(key);
- if (properties == null) {
- properties = new Properties();
- }
- properties.setGroupId(value);
- continue;
- } else if (key.equals("JMSXGroupSeq")) {
- UnsignedInteger value = new UnsignedInteger(message.getIntProperty(key));
- if (properties == null) {
- properties = new Properties();
- }
- properties.setGroupSequence(value);
- continue;
- }
- } else if (key.startsWith(JMS_AMQP_PREFIX)) {
- // AMQP Message Information stored from a conversion to the Core Message
- if (key.equals(JMS_AMQP_MESSAGE_FORMAT)) {
- messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
- continue;
- } else if (key.equals(JMS_AMQP_NATIVE)) {
- // skip..internal use only
- continue;
- } else if (key.equals(JMS_AMQP_ORIGINAL_ENCODING)) {
- // skip..internal use only
- continue;
- } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
- if (header == null) {
- header = new Header();
- }
- header.setFirstAcquirer(message.getBooleanProperty(key));
- continue;
- } else if (key.equals(JMS_AMQP_HEADER)) {
- if (header == null) {
- header = new Header();
- }
- continue;
- } else if (key.equals(JMS_AMQP_HEADER_DURABLE)) {
- if (header == null) {
- header = new Header();
- }
- header.setDurable(message.getInnerMessage().isDurable());
- continue;
- } else if (key.equals(JMS_AMQP_HEADER_PRIORITY)) {
- if (header == null) {
- header = new Header();
- }
- header.setPriority(UnsignedByte.valueOf(priority));
- continue;
- } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
- if (properties == null) {
- properties = new Properties();
- }
- continue;
- } else if (key.startsWith(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX)) {
- if (daMap == null) {
- daMap = new HashMap<>();
- }
- String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
- daMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
- continue;
- } else if (key.startsWith(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX)) {
- if (maMap == null) {
- maMap = new HashMap<>();
- }
- String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
- maMap.put(Symbol.valueOf(name), message.getObjectProperty(key));
- continue;
- } else if (key.equals(JMS_AMQP_CONTENT_TYPE)) {
- if (properties == null) {
- properties = new Properties();
- }
- properties.setContentType(Symbol.getSymbol(message.getStringProperty(key)));
- continue;
- } else if (key.equals(JMS_AMQP_CONTENT_ENCODING)) {
- if (properties == null) {
- properties = new Properties();
- }
- properties.setContentEncoding(Symbol.getSymbol(message.getStringProperty(key)));
- continue;
- } else if (key.equals(JMS_AMQP_REPLYTO_GROUP_ID)) {
- if (properties == null) {
- properties = new Properties();
- }
- properties.setReplyToGroupId(message.getStringProperty(key));
- continue;
- } else if (key.startsWith(JMS_AMQP_FOOTER_PREFIX)) {
- if (footerMap == null) {
- footerMap = new HashMap<>();
- }
- String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
- footerMap.put(name, message.getObjectProperty(key));
- continue;
- }
- } else if (key.equals("_AMQ_GROUP_ID")) {
- String value = message.getStringProperty(key);
- if (properties == null) {
- properties = new Properties();
- }
- properties.setGroupId(value);
- continue;
- } else if (key.equals(NATIVE_MESSAGE_ID)) {
- // skip..internal use only
- continue;
- } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
- // skip..remove annotation from previous inbound transformation
- continue;
- } else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) {
- // skip..internal use only - TODO - Remove this deprecated value in future release.
- continue;
- }
-
- if (apMap == null) {
- apMap = new HashMap<>();
- }
-
- Object objectProperty = message.getObjectProperty(key);
- if (objectProperty instanceof byte[]) {
- objectProperty = new Binary((byte[]) objectProperty);
- }
-
- apMap.put(key, objectProperty);
- }
-
- EncoderImpl encoder = tlsCodec.get().encoder;
- encoder.setByteBuffer(buffer);
-
- if (header != null) {
- encoder.writeObject(header);
- }
- if (daMap != null) {
- encoder.writeObject(new DeliveryAnnotations(daMap));
- }
- if (maMap != null) {
- encoder.writeObject(new MessageAnnotations(maMap));
- }
- if (properties != null) {
- encoder.writeObject(properties);
- }
- if (apMap != null) {
- encoder.writeObject(new ApplicationProperties(apMap));
- }
- if (body != null) {
- encoder.writeObject(body);
- }
- if (footerMap != null) {
- encoder.writeObject(new Footer(footerMap));
- }
-
- return messageFormat;
- }
-
- private Section convertBody(ServerJMSMessage message) throws JMSException {
-
- Section body = null;
- short orignalEncoding = AMQP_UNKNOWN;
-
- try {
- orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
- } catch (Exception ex) {
- // Ignore and stick with UNKNOWN
- }
-
- if (message instanceof ServerJMSBytesMessage) {
- Binary payload = getBinaryFromMessageBody((ServerJMSBytesMessage) message);
-
- if (payload == null) {
- payload = EMPTY_BINARY;
- }
-
- switch (orignalEncoding) {
- case AMQP_NULL:
- break;
- case AMQP_VALUE_BINARY:
- body = new AmqpValue(payload);
- break;
- case AMQP_DATA:
- case AMQP_UNKNOWN:
- default:
- body = new Data(payload);
- break;
- }
- } else if (message instanceof ServerJMSTextMessage) {
- switch (orignalEncoding) {
- case AMQP_NULL:
- break;
- case AMQP_DATA:
- body = new Data(getBinaryFromMessageBody((ServerJMSTextMessage) message));
- break;
- case AMQP_VALUE_STRING:
- case AMQP_UNKNOWN:
- default:
- body = new AmqpValue(((TextMessage) message).getText());
- break;
- }
- } else if (message instanceof ServerJMSMapMessage) {
- body = new AmqpValue(getMapFromMessageBody((ServerJMSMapMessage) message));
- } else if (message instanceof ServerJMSStreamMessage) {
- ArrayList<Object> list = new ArrayList<>();
- final ServerJMSStreamMessage m = (ServerJMSStreamMessage) message;
- try {
- while (true) {
- list.add(m.readObject());
- }
- } catch (MessageEOFException e) {
- }
-
- // Deprecated encoding markers - TODO - Remove on future release
- if (orignalEncoding == AMQP_UNKNOWN) {
- String amqpType = message.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY);
- if (amqpType != null) {
- if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) {
- orignalEncoding = AMQP_VALUE_LIST;
- } else {
- orignalEncoding = AMQP_SEQUENCE;
- }
- }
- }
-
- switch (orignalEncoding) {
- case AMQP_SEQUENCE:
- body = new AmqpSequence(list);
- break;
- case AMQP_VALUE_LIST:
- case AMQP_UNKNOWN:
- default:
- body = new AmqpValue(list);
- break;
- }
- } else if (message instanceof ServerJMSObjectMessage) {
- Binary payload = getBinaryFromMessageBody((ServerJMSObjectMessage) message);
-
- if (payload == null) {
- payload = EMPTY_BINARY;
- }
-
- switch (orignalEncoding) {
- case AMQP_VALUE_BINARY:
- body = new AmqpValue(payload);
- break;
- case AMQP_DATA:
- case AMQP_UNKNOWN:
- default:
- body = new Data(payload);
- break;
- }
-
- // For a non-AMQP message we tag the outbound content type as containing
- // a serialized Java object so that an AMQP client has a hint as to what
- // we are sending it.
- if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
- message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
- }
- } else if (message instanceof ServerJMSMessage) {
- // If this is not an AMQP message that was converted then the original encoding
- // will be unknown so we check for special cases of messages with special data
- // encoded into the server message body.
- if (orignalEncoding == AMQP_UNKNOWN) {
- MessageInternal internalMessage = message.getInnerMessage();
- int readerIndex = internalMessage.getBodyBuffer().readerIndex();
- try {
- Object s = internalMessage.getBodyBuffer().readNullableSimpleString();
- if (s != null) {
- body = new AmqpValue(s.toString());
- }
- } catch (Throwable ignored) {
- logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored);
- } finally {
- internalMessage.getBodyBuffer().readerIndex(readerIndex);
- }
- }
- }
-
- return body;
- }
-
- private Binary getBinaryFromMessageBody(ServerJMSBytesMessage message) throws JMSException {
- byte[] data = new byte[(int) message.getBodyLength()];
- message.readBytes(data);
- message.reset(); // Need to reset after readBytes or future readBytes
-
- return new Binary(data);
- }
-
- private Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException {
- Binary result = null;
- String text = message.getText();
- if (text != null) {
- result = new Binary(text.getBytes(StandardCharsets.UTF_8));
- }
-
- return result;
- }
-
- private Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
- message.getInnerMessage().getBodyBuffer().resetReaderIndex();
- int size = message.getInnerMessage().getBodyBuffer().readInt();
- byte[] bytes = new byte[size];
- message.getInnerMessage().getBodyBuffer().readBytes(bytes);
-
- return new Binary(bytes);
- }
-
- private Map<String, Object> getMapFromMessageBody(ServerJMSMapMessage message) throws JMSException {
- final HashMap<String, Object> map = new LinkedHashMap<>();
-
- @SuppressWarnings("unchecked")
- final Enumeration<String> names = message.getMapNames();
- while (names.hasMoreElements()) {
- String key = names.nextElement();
- Object value = message.getObject(key);
- if (value instanceof byte[]) {
- value = new Binary((byte[]) value);
- }
- map.put(key, value);
- }
-
- return map;
- }
-
- private static byte destinationType(Destination destination) {
- if (destination instanceof Queue) {
- if (destination instanceof TemporaryQueue) {
- return TEMP_QUEUE_TYPE;
- } else {
- return QUEUE_TYPE;
- }
- } else if (destination instanceof Topic) {
- if (destination instanceof TemporaryTopic) {
- return TEMP_TOPIC_TYPE;
- } else {
- return TOPIC_TYPE;
- }
- }
-
- throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
deleted file mode 100644
index 5113513..0000000
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/OutboundTransformer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.protocol.amqp.converter.message;
-
-import java.io.UnsupportedEncodingException;
-
-import javax.jms.JMSException;
-
-import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage;
-import org.apache.activemq.artemis.utils.IDGenerator;
-import org.apache.qpid.proton.codec.WritableBuffer;
-
-public abstract class OutboundTransformer {
-
- protected IDGenerator idGenerator;
-
- public OutboundTransformer(IDGenerator idGenerator) {
- this.idGenerator = idGenerator;
- }
-
- /**
- * Given an JMS Message perform a conversion to an AMQP Message and encode into a form that
- * is ready for transmission.
- *
- * @param message
- * the message to transform
- * @param buffer
- * the buffer where encoding should write to
- *
- * @return the message format key of the encoded message.
- *
- * @throws JMSException
- * if an error occurs during message transformation
- * @throws UnsupportedEncodingException
- * if an error occurs during message encoding
- */
- public abstract long transform(ServerJMSMessage message, WritableBuffer buffer) throws JMSException, UnsupportedEncodingException;
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 6462315..bac3e7e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -134,6 +134,10 @@ public class AMQPConnectionContext extends ProtonInitializable {
handler.flush();
}
+ public void flush(boolean wait) {
+ handler.flush(wait);
+ }
+
public void close(ErrorCondition errorCondition) {
handler.close(errorCondition);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/669e7cf2/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 8341de7..ea2635e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -134,6 +134,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
@Override
public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
Receiver receiver;
+ ByteBuf buffer = null;
try {
receiver = ((Receiver) delivery.getLink());
@@ -144,26 +145,30 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
if (delivery.isPartial()) {
return;
}
+ // This should be used if getDataLength was avilable
+// byte[] data = new byte[delivery.getDataLength()];
- ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
- try {
- synchronized (connection.getLock()) {
- DeliveryUtil.readDelivery(receiver, buffer);
+ buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024);
+ Transaction tx = null;
- receiver.advance();
+ synchronized (connection.getLock()) {
+ DeliveryUtil.readDelivery(receiver, buffer);
+ receiver.advance();
+ }
- Transaction tx = null;
- if (delivery.getRemoteState() instanceof TransactionalState) {
+ byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
- TransactionalState txState = (TransactionalState) delivery.getRemoteState();
- tx = this.sessionSPI.getTransaction(txState.getTxnId());
- }
- sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), buffer);
+ if (delivery.getRemoteState() instanceof TransactionalState) {
- flow(maxCreditAllocation, minCreditRefresh);
- }
- } finally {
- buffer.release();
+ TransactionalState txState = (TransactionalState) delivery.getRemoteState();
+ tx = this.sessionSPI.getTransaction(txState.getTxnId());
+ }
+
+ sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
+
+ synchronized (connection.getLock()) {
+ flow(maxCreditAllocation, minCreditRefresh);
}
} catch (Exception e) {
log.warn(e.getMessage(), e);
@@ -174,6 +179,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
rejected.setError(condition);
delivery.disposition(rejected);
delivery.settle();
+ } finally {
+ if (buffer != null) {
+ buffer.release();
+ }
}
}