You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:06 UTC
[02/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
ARTEMIS-1616 OpenWire improvements
Refactored OpenWireMessageConverter::toAMQMessage into smaller methods
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6b6dd95
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6b6dd95
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6b6dd95
Branch: refs/heads/master
Commit: c6b6dd95d1665230d667557df240d8a62a2118af
Parents: e7a1dca
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 14:37:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 562 +++++++++++--------
1 file changed, 316 insertions(+), 246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6b6dd95/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 0948f8a..3dc4a4e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -120,7 +120,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
return null;
}
-// @Override
+ // @Override
public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
final Message messageSend = (Message) message;
@@ -205,7 +205,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
- putMsgProducerId(producerId, marshaller, coreMessage);
+ final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+ producerIdBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
}
final ByteSequence propBytes = messageSend.getMarshalledProperties();
if (propBytes != null) {
@@ -437,14 +439,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
}
- private static void putMsgProducerId(final ProducerId producerId,
- final WireFormat marshaller,
- final CoreMessage coreMessage) throws IOException {
- final ByteSequence producerIdBytes = marshaller.marshal(producerId);
- producerIdBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
- }
-
private static void putMsgMarshalledProperties(final ByteSequence propBytes,
final Message messageSend,
final CoreMessage coreMessage) throws IOException {
@@ -512,9 +506,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
public MessageDispatch createMessageDispatch(MessageReference reference,
- ICoreMessage message,
- AMQConsumer consumer) throws IOException, JMSException {
- ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
+ ICoreMessage message,
+ AMQConsumer consumer) throws IOException, JMSException {
+ ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -529,35 +523,48 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
return md;
}
- private ActiveMQMessage toAMQMessage(MessageReference reference,
+ private static ActiveMQMessage toAMQMessage(MessageReference reference,
ICoreMessage coreMessage,
+ WireFormat marshaller,
AMQConsumer consumer) throws IOException {
- ActiveMQMessage amqMsg = null;
- byte coreType = coreMessage.getType();
+ final ActiveMQMessage amqMsg;
+ final byte coreType = coreMessage.getType();
+ final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+ final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
+ final byte[] bytes;
+ final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+ buffer.resetReaderIndex();
+
switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
amqMsg = new ActiveMQBytesMessage();
+ bytes = toAMQMessageBytesType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
amqMsg = new ActiveMQMapMessage();
+ bytes = toAMQMessageMapType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
amqMsg = new ActiveMQObjectMessage();
+ bytes = toAMQMessageObjectType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
amqMsg = new ActiveMQStreamMessage();
+ bytes = toAMQMessageStreamType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
amqMsg = new ActiveMQTextMessage();
+ bytes = toAMQMessageTextType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE:
amqMsg = new ActiveMQMessage();
+ bytes = toAMQMessageDefaultType(buffer, isCompressed);
break;
default:
throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
}
- String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
+ final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
if (type != null) {
amqMsg.setJMSType(type);
}
@@ -572,165 +579,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setBrokerInTime(brokerInTime);
- ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
- Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
- boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed);
- byte[] bytes = null;
- if (buffer != null) {
- buffer.resetReaderIndex();
-
- if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
- OutputStream out = bytesOut;
- if (isCompressed) {
- out = new DeflaterOutputStream(out, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(out)) {
- MarshallingSupport.writeUTF8(dataOut, text.toString());
- dataOut.flush();
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
- TypedProperties mapData = new TypedProperties();
- //it could be a null map
- if (buffer.readableBytes() > 0) {
- mapData.decode(buffer.byteBuf());
- Map<String, Object> map = mapData.getMap();
- ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
- OutputStream os = out;
- if (isCompressed) {
- os = new DeflaterOutputStream(os, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(os)) {
- MarshallingSupport.marshalPrimitiveMap(map, dataOut);
- dataOut.flush();
- }
- bytes = out.toByteArray();
- }
-
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
- if (buffer.readableBytes() > 0) {
- int len = buffer.readInt();
- bytes = new byte[len];
- buffer.readBytes(bytes);
- if (isCompressed) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
- out.write(bytes);
- out.flush();
- }
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
- org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
- OutputStream out = bytesOut;
- if (isCompressed) {
- out = new DeflaterOutputStream(bytesOut, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
- boolean stop = false;
- while (!stop && buffer.readable()) {
- byte primitiveType = buffer.readByte();
- switch (primitiveType) {
- case DataConstants.BOOLEAN:
- MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
- break;
- case DataConstants.BYTE:
- MarshallingSupport.marshalByte(dataOut, buffer.readByte());
- break;
- case DataConstants.BYTES:
- int len = buffer.readInt();
- byte[] bytesData = new byte[len];
- buffer.readBytes(bytesData);
- MarshallingSupport.marshalByteArray(dataOut, bytesData);
- break;
- case DataConstants.CHAR:
- char ch = (char) buffer.readShort();
- MarshallingSupport.marshalChar(dataOut, ch);
- break;
- case DataConstants.DOUBLE:
- double doubleVal = Double.longBitsToDouble(buffer.readLong());
- MarshallingSupport.marshalDouble(dataOut, doubleVal);
- break;
- case DataConstants.FLOAT:
- Float floatVal = Float.intBitsToFloat(buffer.readInt());
- MarshallingSupport.marshalFloat(dataOut, floatVal);
- break;
- case DataConstants.INT:
- MarshallingSupport.marshalInt(dataOut, buffer.readInt());
- break;
- case DataConstants.LONG:
- MarshallingSupport.marshalLong(dataOut, buffer.readLong());
- break;
- case DataConstants.SHORT:
- MarshallingSupport.marshalShort(dataOut, buffer.readShort());
- break;
- case DataConstants.STRING:
- String string = buffer.readNullableString();
- if (string == null) {
- MarshallingSupport.marshalNull(dataOut);
- } else {
- MarshallingSupport.marshalString(dataOut, string);
- }
- break;
- default:
- //now we stop
- stop = true;
- break;
- }
- dataOut.flush();
- }
- }
- bytes = bytesOut.toByteArray();
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
- int n = buffer.readableBytes();
- bytes = new byte[n];
- buffer.readBytes(bytes);
- if (isCompressed) {
- int length = bytes.length;
- Deflater deflater = new Deflater();
- try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- compressed.write(new byte[4]);
- deflater.setInput(bytes);
- deflater.finish();
- byte[] bytesBuf = new byte[1024];
- while (!deflater.finished()) {
- int count = deflater.deflate(bytesBuf);
- compressed.write(bytesBuf, 0, count);
- }
- compressed.flush();
- ByteSequence byteSeq = compressed.toByteSequence();
- ByteSequenceData.writeIntBig(byteSeq, length);
- bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
- } finally {
- deflater.end();
- }
- }
- } else {
- int n = buffer.readableBytes();
- bytes = new byte[n];
- buffer.readBytes(bytes);
- if (isCompressed) {
- try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
- out.write(bytes);
- out.flush();
- bytes = bytesOut.toByteArray();
- }
- }
- }
-
- buffer.resetReaderIndex();// this is important for topics as the buffer
- // may be read multiple times
- }
-
//we need check null because messages may come from other clients
//and those amq specific attribute may not be set.
Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
@@ -740,24 +590,14 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setArrival(arrival);
- String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
- if (brokerPath != null && brokerPath.isEmpty()) {
- String[] brokers = brokerPath.split(",");
- BrokerId[] bids = new BrokerId[brokers.length];
- for (int i = 0; i < bids.length; i++) {
- bids[i] = new BrokerId(brokers[i]);
- }
- amqMsg.setBrokerPath(bids);
+ final String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
+ if (brokerPath != null && !brokerPath.isEmpty()) {
+ setAMQMsgBrokerPath(amqMsg, brokerPath);
}
- String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
- if (clusterPath != null && clusterPath.isEmpty()) {
- String[] cluster = clusterPath.split(",");
- BrokerId[] bids = new BrokerId[cluster.length];
- for (int i = 0; i < bids.length; i++) {
- bids[i] = new BrokerId(cluster[i]);
- }
- amqMsg.setCluster(bids);
+ final String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
+ if (clusterPath != null && !clusterPath.isEmpty()) {
+ setAMQMsgClusterPath(amqMsg, clusterPath);
}
Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
@@ -766,21 +606,19 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setCommandId(commandId);
- SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
+ final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString());
}
- byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
+ final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
if (dsBytes != null) {
- ByteSequence seq = new ByteSequence(dsBytes);
- DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
- amqMsg.setDataStructure(ds);
+ setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
}
final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
- Object value = coreMessage.getGroupID();
+ final Object value = coreMessage.getGroupID();
if (value != null) {
String groupId = value.toString();
amqMsg.setGroupID(groupId);
@@ -792,8 +630,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setGroupSequence(groupSequence);
- MessageId mid = null;
- byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
+ final MessageId mid;
+ final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
if (midBytes != null) {
ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq);
@@ -803,97 +641,329 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
amqMsg.setMessageId(mid);
- byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
+ final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
if (origDestBytes != null) {
- ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
- amqMsg.setOriginalDestination(origDest);
+ setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
}
- byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
+ final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
if (origTxIdBytes != null) {
- TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
- amqMsg.setOriginalTransactionId(origTxId);
+ setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
}
- byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
+ final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
if (producerIdBytes != null) {
ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
amqMsg.setProducerId(producerId);
}
- byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
+ final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
if (marshalledBytes != null) {
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
}
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
- byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
+ final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
if (replyToBytes != null) {
- ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
- amqMsg.setReplyTo(replyTo);
+ setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
}
- String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
+ final String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
if (userId != null) {
amqMsg.setUserID(userId);
}
- Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
+ final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
if (isDroppable != null) {
amqMsg.setDroppable(isDroppable);
}
- SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
if (dlqCause != null) {
- try {
- amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
- } catch (JMSException e) {
- throw new IOException("failure to set dlq property " + dlqCause, e);
- }
+ setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
}
- SimpleString lastValueProperty = coreMessage.getLastValueProperty();
+ final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
if (lastValueProperty != null) {
- try {
- amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
- } catch (JMSException e) {
- throw new IOException("failure to set lvq property " + dlqCause, e);
- }
+ setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
}
- Set<SimpleString> props = coreMessage.getPropertyNames();
+ final Set<SimpleString> props = coreMessage.getPropertyNames();
if (props != null) {
- for (SimpleString s : props) {
- String keyStr = s.toString();
- if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
- !consumer.hasNotificationDestination()) {
- continue;
+ setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer);
+ }
+
+ if (bytes != null) {
+ ByteSequence content = new ByteSequence(bytes);
+ amqMsg.setContent(content);
+ }
+ return amqMsg;
+ }
+
+ private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes = null;
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(out, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(out)) {
+ MarshallingSupport.writeUTF8(dataOut, text.toString());
+ dataOut.flush();
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageMapType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes = null;
+ //it could be a null map
+ if (buffer.readableBytes() > 0) {
+ TypedProperties mapData = new TypedProperties();
+ mapData.decode(buffer.byteBuf());
+ Map<String, Object> map = mapData.getMap();
+ ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+ OutputStream os = out;
+ if (isCompressed) {
+ os = new DeflaterOutputStream(os, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(os)) {
+ MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+ dataOut.flush();
+ }
+ bytes = out.toByteArray();
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageObjectType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes = null;
+ if (buffer.readableBytes() > 0) {
+ int len = buffer.readInt();
+ bytes = new byte[len];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ out.write(bytes);
+ out.flush();
}
- Object prop = coreMessage.getObjectProperty(s);
- try {
- if (prop instanceof SimpleString) {
- amqMsg.setObjectProperty(s.toString(), prop.toString());
- } else {
- if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
- Long l = (Long) prop;
- amqMsg.setObjectProperty(s.toString(), l.intValue());
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageStreamType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes;
+ org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(bytesOut, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+ boolean stop = false;
+ while (!stop && buffer.readable()) {
+ byte primitiveType = buffer.readByte();
+ switch (primitiveType) {
+ case DataConstants.BOOLEAN:
+ MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+ break;
+ case DataConstants.BYTE:
+ MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+ break;
+ case DataConstants.BYTES:
+ int len = buffer.readInt();
+ byte[] bytesData = new byte[len];
+ buffer.readBytes(bytesData);
+ MarshallingSupport.marshalByteArray(dataOut, bytesData);
+ break;
+ case DataConstants.CHAR:
+ char ch = (char) buffer.readShort();
+ MarshallingSupport.marshalChar(dataOut, ch);
+ break;
+ case DataConstants.DOUBLE:
+ double doubleVal = Double.longBitsToDouble(buffer.readLong());
+ MarshallingSupport.marshalDouble(dataOut, doubleVal);
+ break;
+ case DataConstants.FLOAT:
+ Float floatVal = Float.intBitsToFloat(buffer.readInt());
+ MarshallingSupport.marshalFloat(dataOut, floatVal);
+ break;
+ case DataConstants.INT:
+ MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+ break;
+ case DataConstants.LONG:
+ MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+ break;
+ case DataConstants.SHORT:
+ MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+ break;
+ case DataConstants.STRING:
+ String string = buffer.readNullableString();
+ if (string == null) {
+ MarshallingSupport.marshalNull(dataOut);
} else {
- amqMsg.setObjectProperty(s.toString(), prop);
+ MarshallingSupport.marshalString(dataOut, string);
}
- }
- } catch (JMSException e) {
- throw new IOException("exception setting property " + s + " : " + prop, e);
+ break;
+ default:
+ //now we stop
+ stop = true;
+ break;
}
+ dataOut.flush();
}
}
+ bytes = bytesOut.toByteArray();
+ return bytes;
+ }
- amqMsg.setCompressed(isCompressed);
- if (bytes != null) {
- ByteSequence content = new ByteSequence(bytes);
- amqMsg.setContent(content);
+ private static byte[] toAMQMessageBytesType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ int n = buffer.readableBytes();
+ byte[] bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ bytes = toAMQMessageCompressedBytesType(bytes);
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageCompressedBytesType(final byte[] bytes) throws IOException {
+ int length = bytes.length;
+ Deflater deflater = new Deflater();
+ try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ compressed.write(new byte[4]);
+ deflater.setInput(bytes);
+ deflater.finish();
+ byte[] bytesBuf = new byte[1024];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(bytesBuf);
+ compressed.write(bytesBuf, 0, count);
+ }
+ compressed.flush();
+ ByteSequence byteSeq = compressed.toByteSequence();
+ ByteSequenceData.writeIntBig(byteSeq, length);
+ return Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+ } finally {
+ deflater.end();
}
- return amqMsg;
}
+ private static byte[] toAMQMessageDefaultType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ int n = buffer.readableBytes();
+ byte[] bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ out.write(bytes);
+ out.flush();
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ return bytes;
+ }
+
+ private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) {
+ String[] brokers = brokerPath.split(",");
+ BrokerId[] bids = new BrokerId[brokers.length];
+ for (int i = 0; i < bids.length; i++) {
+ bids[i] = new BrokerId(brokers[i]);
+ }
+ amqMsg.setBrokerPath(bids);
+ }
+
+ private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) {
+ String[] cluster = clusterPath.split(",");
+ BrokerId[] bids = new BrokerId[cluster.length];
+ for (int i = 0; i < bids.length; i++) {
+ bids[i] = new BrokerId(cluster[i]);
+ }
+ amqMsg.setCluster(bids);
+ }
+
+ private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] dsBytes) throws IOException {
+ ByteSequence seq = new ByteSequence(dsBytes);
+ DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
+ amqMsg.setDataStructure(ds);
+ }
+
+ private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] origDestBytes) throws IOException {
+ ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
+ amqMsg.setOriginalDestination(origDest);
+ }
+
+ private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] origTxIdBytes) throws IOException {
+ TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
+ amqMsg.setOriginalTransactionId(origTxId);
+ }
+
+ private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] replyToBytes) throws IOException {
+ ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
+ amqMsg.setReplyTo(replyTo);
+ }
+
+ private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg,
+ final SimpleString dlqCause) throws IOException {
+ try {
+ amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
+ } catch (JMSException e) {
+ throw new IOException("failure to set dlq property " + dlqCause, e);
+ }
+ }
+
+ private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg,
+ final SimpleString lastValueProperty) throws IOException {
+ try {
+ amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
+ } catch (JMSException e) {
+ throw new IOException("failure to set lvq property " + lastValueProperty, e);
+ }
+ }
+
+ private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
+ final ICoreMessage coreMessage,
+ final Set<SimpleString> props,
+ final AMQConsumer consumer) throws IOException {
+ for (SimpleString s : props) {
+ final String keyStr = s.toString();
+ if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
+ continue;
+ }
+ final Object prop = coreMessage.getObjectProperty(s);
+ try {
+ if (prop instanceof SimpleString) {
+ amqMsg.setObjectProperty(keyStr, prop.toString());
+ } else {
+ if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
+ Long l = (Long) prop;
+ amqMsg.setObjectProperty(keyStr, l.intValue());
+ } else {
+ amqMsg.setObjectProperty(keyStr, prop);
+ }
+ }
+ } catch (JMSException e) {
+ throw new IOException("exception setting property " + s + " : " + prop, e);
+ }
+ }
+ }
}