You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:10 UTC
[06/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
ARTEMIS-1616 OpenWire improvements
Used SimpleString on OpenWireMessageConverter to avoid translations on CoreMessage
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64724c35
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64724c35
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64724c35
Branch: refs/heads/master
Commit: 64724c3520586c6cd1bc0aec9942ae5bb5562459
Parents: 17c0a33
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 22:24:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 331 ++++++++++---------
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
2 files changed, 167 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 54f3c99..457593d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -71,29 +71,31 @@ import org.fusesource.hawtbuf.UTF8Buffer;
public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
- public static final String AMQ_PREFIX = "__HDR_";
- public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
-
- private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL";
- private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME";
-
- private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH";
- private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
- private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
- private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
- private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString();
- private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
- private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
- private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION";
- private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
- private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
- private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
- private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
-
- private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
-
- private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
- private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
+ private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
+ private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
+ private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_");
+ public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause");
+
+ private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL");
+ private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME");
+
+ private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH");
+ private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
+ private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
+ private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
+ private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
+ private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
+ private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
+ private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
+ private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
+ private static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
+ private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP");
+ private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
+
+ private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
+
+ private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
+ private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
@@ -128,7 +130,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final String type = messageSend.getType();
if (type != null) {
- coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
+ coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type));
}
coreMessage.setDurable(messageSend.isPersistent());
coreMessage.setExpiration(messageSend.getExpiration());
@@ -185,7 +187,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
- coreMessage.putStringProperty("JMSCorrelationID", corrId);
+ coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
}
final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
@@ -193,7 +195,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
final String groupId = messageSend.getGroupID();
if (groupId != null) {
- coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
+ coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId));
}
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
@@ -219,7 +221,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final String userId = messageSend.getUserID();
if (userId != null) {
- coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
+ coreMessage.putStringProperty(AMQ_MSG_USER_ID, new SimpleString(userId));
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
@@ -415,7 +417,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
builder.append(','); //is this separator safe?
}
}
- coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+ coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString()));
}
private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
@@ -426,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
builder.append(','); //is this separator safe?
}
}
- coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+ coreMessage.putStringProperty(AMQ_MSG_CLUSTER, new SimpleString(builder.toString()));
}
private static void putMsgDataStructure(final DataStructure ds,
@@ -557,7 +559,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
}
- String type = coreMessage.getStringProperty(new SimpleString("JMSType"));
+ String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
if (type != null) {
amqMsg.setJMSType(type);
}
@@ -580,156 +582,155 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
byte[] bytes = null;
if (buffer != null) {
buffer.resetReaderIndex();
- synchronized (buffer) {
- if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
- OutputStream out = bytesOut;
- if (isCompressed) {
- out = new DeflaterOutputStream(out, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(out)) {
- MarshallingSupport.writeUTF8(dataOut, text.toString());
- dataOut.flush();
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
- TypedProperties mapData = new TypedProperties();
- //it could be a null map
- if (buffer.readableBytes() > 0) {
- mapData.decode(buffer.byteBuf());
- Map<String, Object> map = mapData.getMap();
- ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
- OutputStream os = out;
- if (isCompressed) {
- os = new DeflaterOutputStream(os, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(os)) {
- MarshallingSupport.marshalPrimitiveMap(map, dataOut);
- dataOut.flush();
- }
- bytes = out.toByteArray();
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
- if (buffer.readableBytes() > 0) {
- int len = buffer.readInt();
- bytes = new byte[len];
- buffer.readBytes(bytes);
- if (isCompressed) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
- out.write(bytes);
- out.flush();
- }
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
- org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+ if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
OutputStream out = bytesOut;
if (isCompressed) {
- out = new DeflaterOutputStream(bytesOut, true);
+ out = new DeflaterOutputStream(out, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
- boolean stop = false;
- while (!stop && buffer.readable()) {
- byte primitiveType = buffer.readByte();
- switch (primitiveType) {
- case DataConstants.BOOLEAN:
- MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
- break;
- case DataConstants.BYTE:
- MarshallingSupport.marshalByte(dataOut, buffer.readByte());
- break;
- case DataConstants.BYTES:
- int len = buffer.readInt();
- byte[] bytesData = new byte[len];
- buffer.readBytes(bytesData);
- MarshallingSupport.marshalByteArray(dataOut, bytesData);
- break;
- case DataConstants.CHAR:
- char ch = (char) buffer.readShort();
- MarshallingSupport.marshalChar(dataOut, ch);
- break;
- case DataConstants.DOUBLE:
- double doubleVal = Double.longBitsToDouble(buffer.readLong());
- MarshallingSupport.marshalDouble(dataOut, doubleVal);
- break;
- case DataConstants.FLOAT:
- Float floatVal = Float.intBitsToFloat(buffer.readInt());
- MarshallingSupport.marshalFloat(dataOut, floatVal);
- break;
- case DataConstants.INT:
- MarshallingSupport.marshalInt(dataOut, buffer.readInt());
- break;
- case DataConstants.LONG:
- MarshallingSupport.marshalLong(dataOut, buffer.readLong());
- break;
- case DataConstants.SHORT:
- MarshallingSupport.marshalShort(dataOut, buffer.readShort());
- break;
- case DataConstants.STRING:
- String string = buffer.readNullableString();
- if (string == null) {
- MarshallingSupport.marshalNull(dataOut);
- } else {
- MarshallingSupport.marshalString(dataOut, string);
- }
- break;
- default:
- //now we stop
- stop = true;
- break;
- }
- dataOut.flush();
- }
+ MarshallingSupport.writeUTF8(dataOut, text.toString());
+ dataOut.flush();
+ bytes = bytesOut.toByteArray();
}
- bytes = bytesOut.toByteArray();
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
- int n = buffer.readableBytes();
- bytes = new byte[n];
- buffer.readBytes(bytes);
+ }
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
+ TypedProperties mapData = new TypedProperties();
+ //it could be a null map
+ if (buffer.readableBytes() > 0) {
+ mapData.decode(buffer.byteBuf());
+ Map<String, Object> map = mapData.getMap();
+ ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+ OutputStream os = out;
if (isCompressed) {
- int length = bytes.length;
- Deflater deflater = new Deflater();
- try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- compressed.write(new byte[4]);
- deflater.setInput(bytes);
- deflater.finish();
- byte[] bytesBuf = new byte[1024];
- while (!deflater.finished()) {
- int count = deflater.deflate(bytesBuf);
- compressed.write(bytesBuf, 0, count);
- }
- compressed.flush();
- ByteSequence byteSeq = compressed.toByteSequence();
- ByteSequenceData.writeIntBig(byteSeq, length);
- bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
- } finally {
- deflater.end();
- }
+ os = new DeflaterOutputStream(os, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(os)) {
+ MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+ dataOut.flush();
}
- } else {
- int n = buffer.readableBytes();
- bytes = new byte[n];
+ bytes = out.toByteArray();
+ }
+
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
+ if (buffer.readableBytes() > 0) {
+ int len = buffer.readInt();
+ bytes = new byte[len];
buffer.readBytes(bytes);
if (isCompressed) {
- try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
- bytes = bytesOut.toByteArray();
}
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
+ org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(bytesOut, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+ boolean stop = false;
+ while (!stop && buffer.readable()) {
+ byte primitiveType = buffer.readByte();
+ switch (primitiveType) {
+ case DataConstants.BOOLEAN:
+ MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+ break;
+ case DataConstants.BYTE:
+ MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+ break;
+ case DataConstants.BYTES:
+ int len = buffer.readInt();
+ byte[] bytesData = new byte[len];
+ buffer.readBytes(bytesData);
+ MarshallingSupport.marshalByteArray(dataOut, bytesData);
+ break;
+ case DataConstants.CHAR:
+ char ch = (char) buffer.readShort();
+ MarshallingSupport.marshalChar(dataOut, ch);
+ break;
+ case DataConstants.DOUBLE:
+ double doubleVal = Double.longBitsToDouble(buffer.readLong());
+ MarshallingSupport.marshalDouble(dataOut, doubleVal);
+ break;
+ case DataConstants.FLOAT:
+ Float floatVal = Float.intBitsToFloat(buffer.readInt());
+ MarshallingSupport.marshalFloat(dataOut, floatVal);
+ break;
+ case DataConstants.INT:
+ MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+ break;
+ case DataConstants.LONG:
+ MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+ break;
+ case DataConstants.SHORT:
+ MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+ break;
+ case DataConstants.STRING:
+ String string = buffer.readNullableString();
+ if (string == null) {
+ MarshallingSupport.marshalNull(dataOut);
+ } else {
+ MarshallingSupport.marshalString(dataOut, string);
+ }
+ break;
+ default:
+ //now we stop
+ stop = true;
+ break;
+ }
+ dataOut.flush();
+ }
+ }
+ bytes = bytesOut.toByteArray();
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
+ int n = buffer.readableBytes();
+ bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ int length = bytes.length;
+ Deflater deflater = new Deflater();
+ try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ compressed.write(new byte[4]);
+ deflater.setInput(bytes);
+ deflater.finish();
+ byte[] bytesBuf = new byte[1024];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(bytesBuf);
+ compressed.write(bytesBuf, 0, count);
+ }
+ compressed.flush();
+ ByteSequence byteSeq = compressed.toByteSequence();
+ ByteSequenceData.writeIntBig(byteSeq, length);
+ bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+ } finally {
+ deflater.end();
+ }
+ }
+ } else {
+ int n = buffer.readableBytes();
+ bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ out.write(bytes);
+ out.flush();
+ bytes = bytesOut.toByteArray();
}
}
-
- buffer.resetReaderIndex();// this is important for topics as the buffer
- // may be read multiple times
}
+
+ buffer.resetReaderIndex();// this is important for topics as the buffer
+ // may be read multiple times
}
//we need check null because messages may come from other clients
@@ -767,7 +768,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setCommandId(commandId);
- SimpleString corrId = (SimpleString) coreMessage.getObjectProperty("JMSCorrelationID");
+ SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 57865b7..77051cc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -286,7 +286,7 @@ public class AMQConsumer {
for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
- ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
+ ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}