You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:05 UTC
[01/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Repository: activemq-artemis
Updated Branches:
refs/heads/master 04a9884d3 -> b74018192
ARTEMIS-1616 OpenWire improvements
Refactored OpenWireMessageConverter::inbound into a private static method
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0387b1a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0387b1a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0387b1a8
Branch: refs/heads/master
Commit: 0387b1a8427673f42166df5651bc504d38f365ce
Parents: c6b6dd9
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Jan 22 19:57:29 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../core/protocol/openwire/OpenWireMessageConverter.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0387b1a8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 3dc4a4e..19dc802 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -120,10 +120,14 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
return null;
}
- // @Override
- public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+ public org.apache.activemq.artemis.api.core.Message inbound(Message message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+ return inbound(message, marshaller, coreMessageObjectPools);
+ }
+
+ private static org.apache.activemq.artemis.api.core.Message inbound(final Message messageSend,
+ final WireFormat marshaller,
+ final CoreMessageObjectPools coreMessageObjectPools) throws Exception {
- final Message messageSend = (Message) message;
final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
final String type = messageSend.getType();
[03/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Added existing queues cache to avoid multiple expensive AMQSession::checkAutoCreateQueue calls
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/17c0a331
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/17c0a331
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/17c0a331
Branch: refs/heads/master
Commit: 17c0a331aceda16460cc4d87f376c8730e479fe5
Parents: 051a3ca
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 21:27:22 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQSession.java | 37 ++++++++++++++++++--
1 file changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17c0a331/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 3ff3ae1..ad15fe7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -89,6 +89,8 @@ public class AMQSession implements SessionCallback {
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
+ private String[] existingQueuesCache;
+
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@@ -105,6 +107,7 @@ public class AMQSession implements SessionCallback {
this.converter = new OpenWireMessageConverter(marshaller.copy());
this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
+ this.existingQueuesCache = null;
}
public boolean isClosed() {
@@ -193,6 +196,33 @@ public class AMQSession implements SessionCallback {
return consumersList;
}
+ private boolean checkCachedExistingQueues(final SimpleString address,
+ final String physicalName,
+ final boolean isTemporary) throws Exception {
+ String[] existingQueuesCache = this.existingQueuesCache;
+ //lazy allocation of the cache
+ if (existingQueuesCache == null) {
+ //16 means 64 bytes with 32 bit references or 128 bytes with 64 bit references -> 1 or 2 cache lines with common archs
+ existingQueuesCache = new String[16];
+ assert (Integer.bitCount(existingQueuesCache.length) == 1) : "existingQueuesCache.length must be power of 2";
+ this.existingQueuesCache = existingQueuesCache;
+ }
+ final int hashCode = physicalName.hashCode();
+ //this.existingQueuesCache.length must be power of 2
+ final int mask = existingQueuesCache.length - 1;
+ final int index = hashCode & mask;
+ final String existingQueue = existingQueuesCache[index];
+ if (existingQueue != null && existingQueue.equals(physicalName)) {
+ //if the information is stale (ie no longer valid) it will fail later
+ return true;
+ }
+ final boolean hasQueue = checkAutoCreateQueue(address, isTemporary);
+ if (hasQueue) {
+ existingQueuesCache[index] = physicalName;
+ }
+ return hasQueue;
+ }
+
private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
boolean hasQueue = true;
if (!connection.containsKnownDestination(queueName)) {
@@ -350,7 +380,7 @@ public class AMQSession implements SessionCallback {
originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
- boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
+ final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
final AtomicInteger count = new AtomicInteger(actualDestinations.length);
@@ -361,13 +391,14 @@ public class AMQSession implements SessionCallback {
for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
final ActiveMQDestination dest = actualDestinations[i];
- final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
+ final String physicalName = dest.getPhysicalName();
+ final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
//the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
coreMsg.setAddress(address);
if (dest.isQueue()) {
- checkAutoCreateQueue(address, dest.isTemporary());
+ checkCachedExistingQueues(address, physicalName, dest.isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.setRoutingType(RoutingType.MULTICAST);
[10/10] activemq-artemis git commit: This closes #1786
Posted by cl...@apache.org.
This closes #1786
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b7401819
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b7401819
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b7401819
Branch: refs/heads/master
Commit: b7401819295e9ff32fb023beb22087afbdbaa4e9
Parents: 04a9884 0387b1a
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jan 22 18:02:04 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:04 2018 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/SimpleString.java | 65 ++
.../openwire/OpenWireMessageConverter.java | 1083 ++++++++++--------
.../core/protocol/openwire/amq/AMQConsumer.java | 11 +-
.../core/protocol/openwire/amq/AMQSession.java | 70 +-
4 files changed, 748 insertions(+), 481 deletions(-)
----------------------------------------------------------------------
[07/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Used SimpleString on AMQSession with HDR_DUPLICATE_DETECTION_ID and CONNECTION_ID_PROPERTY_NAME
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e7a1dca7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e7a1dca7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e7a1dca7
Branch: refs/heads/master
Commit: e7a1dca7b54bc47002b5d743115c1c790ead9579
Parents: 9650c80
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 08:29:28 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../activemq/artemis/core/protocol/openwire/amq/AMQSession.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a1dca7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index ad15fe7..d6fe390 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -370,14 +370,14 @@ public class AMQSession implements SessionCallback {
final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
- originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
+ originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, SimpleString.toSimpleString(this.connection.getState().getInfo().getClientId()));
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
* message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString())) {
- originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+ originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString(messageSend.getMessageId().toString()));
}
final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
[04/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Avoided copy of CoreMessage when not needed and cached lambda on hot path
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2db4eafc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2db4eafc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2db4eafc
Branch: refs/heads/master
Commit: 2db4eafc4d81cd18f9394e356ee11b202f5f2301
Parents: 04a9884
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 14:24:32 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQSession.java | 33 +++++++++++---------
1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2db4eafc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index d284d6c..3ff3ae1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -85,6 +85,8 @@ public class AMQSession implements SessionCallback {
private final OpenWireProtocolManager protocolManager;
+ private final Runnable enableAutoReadAndTtl;
+
private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
public AMQSession(ConnectionInfo connInfo,
@@ -102,6 +104,7 @@ public class AMQSession implements SessionCallback {
OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
this.converter = new OpenWireMessageConverter(marshaller.copy());
+ this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
}
public boolean isClosed() {
@@ -325,7 +328,7 @@ public class AMQSession implements SessionCallback {
boolean sendProducerAck) throws Exception {
messageSend.setBrokerInTime(System.currentTimeMillis());
- ActiveMQDestination destination = messageSend.getDestination();
+ final ActiveMQDestination destination = messageSend.getDestination();
ActiveMQDestination[] actualDestinations = null;
if (destination.isComposite()) {
@@ -335,7 +338,7 @@ public class AMQSession implements SessionCallback {
actualDestinations = new ActiveMQDestination[]{destination};
}
- org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
+ final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
@@ -356,14 +359,15 @@ public class AMQSession implements SessionCallback {
connection.getContext().setDontSendReponse(true);
}
- for (int i = 0; i < actualDestinations.length; i++) {
- ActiveMQDestination dest = actualDestinations[i];
- SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
- org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
+ for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
+ final ActiveMQDestination dest = actualDestinations[i];
+ final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
+ //the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
+ final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
coreMsg.setAddress(address);
- if (actualDestinations[i].isQueue()) {
- checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
+ if (dest.isQueue()) {
+ checkAutoCreateQueue(address, dest.isTemporary());
coreMsg.setRoutingType(RoutingType.ANYCAST);
} else {
coreMsg.setRoutingType(RoutingType.MULTICAST);
@@ -424,12 +428,8 @@ public class AMQSession implements SessionCallback {
//non-persistent messages goes here, by default we stop reading from
//transport
connection.getTransportConnection().setAutoRead(false);
- if (!store.checkMemory(() -> {
- connection.getTransportConnection().setAutoRead(true);
- connection.enableTtl();
- })) {
- connection.getTransportConnection().setAutoRead(true);
- connection.enableTtl();
+ if (!store.checkMemory(enableAutoReadAndTtl)) {
+ enableAutoReadAndTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
@@ -448,6 +448,11 @@ public class AMQSession implements SessionCallback {
}
}
+ private void enableAutoReadAndTtl() {
+ connection.getTransportConnection().setAutoRead(true);
+ connection.enableTtl();
+ }
+
public String convertWildcard(String physicalName) {
return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration());
}
[08/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Optimized SimpleString::split because heavily used into AddressImpl::new
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/051a3cae
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/051a3cae
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/051a3cae
Branch: refs/heads/master
Commit: 051a3cae49c488314b9a5ccaa36056a358e4b9df
Parents: 54d0161
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 18:40:06 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/SimpleString.java | 65 ++++++++++++++++++++
1 file changed, 65 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/051a3cae/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index dbf7468..b3af64a 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import io.netty.buffer.ByteBuf;
@@ -34,6 +35,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
*/
public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
+ private static final SimpleString EMPTY = new SimpleString("");
private static final long serialVersionUID = 4204223851422244307L;
// Attributes
@@ -323,6 +325,14 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
* @return An array of SimpleStrings
*/
public SimpleString[] split(final char delim) {
+ if (this.str != null) {
+ return splitWithCachedString(this, delim);
+ } else {
+ return splitWithoutCachedString(delim);
+ }
+ }
+
+ private SimpleString[] splitWithoutCachedString(final char delim) {
List<SimpleString> all = null;
byte low = (byte) (delim & 0xFF); // low byte
@@ -361,6 +371,58 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
}
}
+ private static SimpleString[] splitWithCachedString(final SimpleString simpleString, final int delim) {
+ final String str = simpleString.str;
+ final byte[] data = simpleString.data;
+ final int length = str.length();
+ List<SimpleString> all = null;
+ int index = 0;
+ while (index < length) {
+ final int delimIndex = str.indexOf(delim, index);
+ if (delimIndex == -1) {
+ //just need to add the last one
+ break;
+ } else {
+ all = addSimpleStringPart(all, data, index, delimIndex);
+ }
+ index = delimIndex + 1;
+ }
+ if (all == null) {
+ return new SimpleString[]{simpleString};
+ } else {
+ // Adding the last one
+ all = addSimpleStringPart(all, data, index, length);
+ // Converting it to arrays
+ final SimpleString[] parts = new SimpleString[all.size()];
+ return all.toArray(parts);
+ }
+ }
+
+ private static List<SimpleString> addSimpleStringPart(List<SimpleString> all,
+ final byte[] data,
+ final int startIndex,
+ final int endIndex) {
+ final int expectedLength = endIndex - startIndex;
+ final SimpleString ss;
+ if (expectedLength == 0) {
+ ss = EMPTY;
+ } else {
+ //extract a byte[] copy from this
+ final int ssIndex = startIndex << 1;
+ final int delIndex = endIndex << 1;
+ final byte[] bytes = Arrays.copyOfRange(data, ssIndex, delIndex);
+ ss = new SimpleString(bytes);
+ }
+ // We will create the ArrayList lazily
+ if (all == null) {
+ // There will be at least 3 strings on this case (which is the actual common usecase)
+ // For that reason I'm allocating the ArrayList with 3 already
+ all = new ArrayList<>(3);
+ }
+ all.add(ss);
+ return all;
+ }
+
/**
* checks to see if this SimpleString contains the char parameter passed in
*
@@ -368,6 +430,9 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
* @return true if the char is found, false otherwise.
*/
public boolean contains(final char c) {
+ if (this.str != null) {
+ return this.str.indexOf(c) != -1;
+ }
final byte low = (byte) (c & 0xFF); // low byte
final byte high = (byte) (c >> 8 & 0xFF); // high byte
[02/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Refactored OpenWireMessageConverter::toAMQMessage into smaller methods
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6b6dd95
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6b6dd95
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6b6dd95
Branch: refs/heads/master
Commit: c6b6dd95d1665230d667557df240d8a62a2118af
Parents: e7a1dca
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 14:37:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 562 +++++++++++--------
1 file changed, 316 insertions(+), 246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6b6dd95/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 0948f8a..3dc4a4e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -120,7 +120,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
return null;
}
-// @Override
+ // @Override
public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
final Message messageSend = (Message) message;
@@ -205,7 +205,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
- putMsgProducerId(producerId, marshaller, coreMessage);
+ final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+ producerIdBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
}
final ByteSequence propBytes = messageSend.getMarshalledProperties();
if (propBytes != null) {
@@ -437,14 +439,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
}
- private static void putMsgProducerId(final ProducerId producerId,
- final WireFormat marshaller,
- final CoreMessage coreMessage) throws IOException {
- final ByteSequence producerIdBytes = marshaller.marshal(producerId);
- producerIdBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
- }
-
private static void putMsgMarshalledProperties(final ByteSequence propBytes,
final Message messageSend,
final CoreMessage coreMessage) throws IOException {
@@ -512,9 +506,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
public MessageDispatch createMessageDispatch(MessageReference reference,
- ICoreMessage message,
- AMQConsumer consumer) throws IOException, JMSException {
- ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
+ ICoreMessage message,
+ AMQConsumer consumer) throws IOException, JMSException {
+ ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -529,35 +523,48 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
return md;
}
- private ActiveMQMessage toAMQMessage(MessageReference reference,
+ private static ActiveMQMessage toAMQMessage(MessageReference reference,
ICoreMessage coreMessage,
+ WireFormat marshaller,
AMQConsumer consumer) throws IOException {
- ActiveMQMessage amqMsg = null;
- byte coreType = coreMessage.getType();
+ final ActiveMQMessage amqMsg;
+ final byte coreType = coreMessage.getType();
+ final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+ final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
+ final byte[] bytes;
+ final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+ buffer.resetReaderIndex();
+
switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
amqMsg = new ActiveMQBytesMessage();
+ bytes = toAMQMessageBytesType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
amqMsg = new ActiveMQMapMessage();
+ bytes = toAMQMessageMapType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
amqMsg = new ActiveMQObjectMessage();
+ bytes = toAMQMessageObjectType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
amqMsg = new ActiveMQStreamMessage();
+ bytes = toAMQMessageStreamType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
amqMsg = new ActiveMQTextMessage();
+ bytes = toAMQMessageTextType(buffer, isCompressed);
break;
case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE:
amqMsg = new ActiveMQMessage();
+ bytes = toAMQMessageDefaultType(buffer, isCompressed);
break;
default:
throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
}
- String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
+ final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
if (type != null) {
amqMsg.setJMSType(type);
}
@@ -572,165 +579,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setBrokerInTime(brokerInTime);
- ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
- Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
- boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed);
- byte[] bytes = null;
- if (buffer != null) {
- buffer.resetReaderIndex();
-
- if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
- OutputStream out = bytesOut;
- if (isCompressed) {
- out = new DeflaterOutputStream(out, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(out)) {
- MarshallingSupport.writeUTF8(dataOut, text.toString());
- dataOut.flush();
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
- TypedProperties mapData = new TypedProperties();
- //it could be a null map
- if (buffer.readableBytes() > 0) {
- mapData.decode(buffer.byteBuf());
- Map<String, Object> map = mapData.getMap();
- ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
- OutputStream os = out;
- if (isCompressed) {
- os = new DeflaterOutputStream(os, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(os)) {
- MarshallingSupport.marshalPrimitiveMap(map, dataOut);
- dataOut.flush();
- }
- bytes = out.toByteArray();
- }
-
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
- if (buffer.readableBytes() > 0) {
- int len = buffer.readInt();
- bytes = new byte[len];
- buffer.readBytes(bytes);
- if (isCompressed) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
- out.write(bytes);
- out.flush();
- }
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
- org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
- OutputStream out = bytesOut;
- if (isCompressed) {
- out = new DeflaterOutputStream(bytesOut, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
- boolean stop = false;
- while (!stop && buffer.readable()) {
- byte primitiveType = buffer.readByte();
- switch (primitiveType) {
- case DataConstants.BOOLEAN:
- MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
- break;
- case DataConstants.BYTE:
- MarshallingSupport.marshalByte(dataOut, buffer.readByte());
- break;
- case DataConstants.BYTES:
- int len = buffer.readInt();
- byte[] bytesData = new byte[len];
- buffer.readBytes(bytesData);
- MarshallingSupport.marshalByteArray(dataOut, bytesData);
- break;
- case DataConstants.CHAR:
- char ch = (char) buffer.readShort();
- MarshallingSupport.marshalChar(dataOut, ch);
- break;
- case DataConstants.DOUBLE:
- double doubleVal = Double.longBitsToDouble(buffer.readLong());
- MarshallingSupport.marshalDouble(dataOut, doubleVal);
- break;
- case DataConstants.FLOAT:
- Float floatVal = Float.intBitsToFloat(buffer.readInt());
- MarshallingSupport.marshalFloat(dataOut, floatVal);
- break;
- case DataConstants.INT:
- MarshallingSupport.marshalInt(dataOut, buffer.readInt());
- break;
- case DataConstants.LONG:
- MarshallingSupport.marshalLong(dataOut, buffer.readLong());
- break;
- case DataConstants.SHORT:
- MarshallingSupport.marshalShort(dataOut, buffer.readShort());
- break;
- case DataConstants.STRING:
- String string = buffer.readNullableString();
- if (string == null) {
- MarshallingSupport.marshalNull(dataOut);
- } else {
- MarshallingSupport.marshalString(dataOut, string);
- }
- break;
- default:
- //now we stop
- stop = true;
- break;
- }
- dataOut.flush();
- }
- }
- bytes = bytesOut.toByteArray();
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
- int n = buffer.readableBytes();
- bytes = new byte[n];
- buffer.readBytes(bytes);
- if (isCompressed) {
- int length = bytes.length;
- Deflater deflater = new Deflater();
- try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- compressed.write(new byte[4]);
- deflater.setInput(bytes);
- deflater.finish();
- byte[] bytesBuf = new byte[1024];
- while (!deflater.finished()) {
- int count = deflater.deflate(bytesBuf);
- compressed.write(bytesBuf, 0, count);
- }
- compressed.flush();
- ByteSequence byteSeq = compressed.toByteSequence();
- ByteSequenceData.writeIntBig(byteSeq, length);
- bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
- } finally {
- deflater.end();
- }
- }
- } else {
- int n = buffer.readableBytes();
- bytes = new byte[n];
- buffer.readBytes(bytes);
- if (isCompressed) {
- try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
- out.write(bytes);
- out.flush();
- bytes = bytesOut.toByteArray();
- }
- }
- }
-
- buffer.resetReaderIndex();// this is important for topics as the buffer
- // may be read multiple times
- }
-
//we need check null because messages may come from other clients
//and those amq specific attribute may not be set.
Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
@@ -740,24 +590,14 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setArrival(arrival);
- String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
- if (brokerPath != null && brokerPath.isEmpty()) {
- String[] brokers = brokerPath.split(",");
- BrokerId[] bids = new BrokerId[brokers.length];
- for (int i = 0; i < bids.length; i++) {
- bids[i] = new BrokerId(brokers[i]);
- }
- amqMsg.setBrokerPath(bids);
+ final String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
+ if (brokerPath != null && !brokerPath.isEmpty()) {
+ setAMQMsgBrokerPath(amqMsg, brokerPath);
}
- String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
- if (clusterPath != null && clusterPath.isEmpty()) {
- String[] cluster = clusterPath.split(",");
- BrokerId[] bids = new BrokerId[cluster.length];
- for (int i = 0; i < bids.length; i++) {
- bids[i] = new BrokerId(cluster[i]);
- }
- amqMsg.setCluster(bids);
+ final String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
+ if (clusterPath != null && !clusterPath.isEmpty()) {
+ setAMQMsgClusterPath(amqMsg, clusterPath);
}
Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
@@ -766,21 +606,19 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setCommandId(commandId);
- SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
+ final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString());
}
- byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
+ final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
if (dsBytes != null) {
- ByteSequence seq = new ByteSequence(dsBytes);
- DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
- amqMsg.setDataStructure(ds);
+ setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
}
final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
- Object value = coreMessage.getGroupID();
+ final Object value = coreMessage.getGroupID();
if (value != null) {
String groupId = value.toString();
amqMsg.setGroupID(groupId);
@@ -792,8 +630,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setGroupSequence(groupSequence);
- MessageId mid = null;
- byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
+ final MessageId mid;
+ final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
if (midBytes != null) {
ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq);
@@ -803,97 +641,329 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
amqMsg.setMessageId(mid);
- byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
+ final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
if (origDestBytes != null) {
- ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
- amqMsg.setOriginalDestination(origDest);
+ setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
}
- byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
+ final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
if (origTxIdBytes != null) {
- TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
- amqMsg.setOriginalTransactionId(origTxId);
+ setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
}
- byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
+ final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
if (producerIdBytes != null) {
ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
amqMsg.setProducerId(producerId);
}
- byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
+ final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
if (marshalledBytes != null) {
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
}
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
- byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
+ final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
if (replyToBytes != null) {
- ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
- amqMsg.setReplyTo(replyTo);
+ setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
}
- String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
+ final String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
if (userId != null) {
amqMsg.setUserID(userId);
}
- Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
+ final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
if (isDroppable != null) {
amqMsg.setDroppable(isDroppable);
}
- SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
if (dlqCause != null) {
- try {
- amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
- } catch (JMSException e) {
- throw new IOException("failure to set dlq property " + dlqCause, e);
- }
+ setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
}
- SimpleString lastValueProperty = coreMessage.getLastValueProperty();
+ final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
if (lastValueProperty != null) {
- try {
- amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
- } catch (JMSException e) {
- throw new IOException("failure to set lvq property " + dlqCause, e);
- }
+ setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
}
- Set<SimpleString> props = coreMessage.getPropertyNames();
+ final Set<SimpleString> props = coreMessage.getPropertyNames();
if (props != null) {
- for (SimpleString s : props) {
- String keyStr = s.toString();
- if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
- !consumer.hasNotificationDestination()) {
- continue;
+ setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer);
+ }
+
+ if (bytes != null) {
+ ByteSequence content = new ByteSequence(bytes);
+ amqMsg.setContent(content);
+ }
+ return amqMsg;
+ }
+
+ private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes = null;
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(out, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(out)) {
+ MarshallingSupport.writeUTF8(dataOut, text.toString());
+ dataOut.flush();
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageMapType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes = null;
+ //it could be a null map
+ if (buffer.readableBytes() > 0) {
+ TypedProperties mapData = new TypedProperties();
+ mapData.decode(buffer.byteBuf());
+ Map<String, Object> map = mapData.getMap();
+ ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+ OutputStream os = out;
+ if (isCompressed) {
+ os = new DeflaterOutputStream(os, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(os)) {
+ MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+ dataOut.flush();
+ }
+ bytes = out.toByteArray();
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageObjectType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes = null;
+ if (buffer.readableBytes() > 0) {
+ int len = buffer.readInt();
+ bytes = new byte[len];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ out.write(bytes);
+ out.flush();
}
- Object prop = coreMessage.getObjectProperty(s);
- try {
- if (prop instanceof SimpleString) {
- amqMsg.setObjectProperty(s.toString(), prop.toString());
- } else {
- if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
- Long l = (Long) prop;
- amqMsg.setObjectProperty(s.toString(), l.intValue());
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageStreamType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ byte[] bytes;
+ org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(bytesOut, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+ boolean stop = false;
+ while (!stop && buffer.readable()) {
+ byte primitiveType = buffer.readByte();
+ switch (primitiveType) {
+ case DataConstants.BOOLEAN:
+ MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+ break;
+ case DataConstants.BYTE:
+ MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+ break;
+ case DataConstants.BYTES:
+ int len = buffer.readInt();
+ byte[] bytesData = new byte[len];
+ buffer.readBytes(bytesData);
+ MarshallingSupport.marshalByteArray(dataOut, bytesData);
+ break;
+ case DataConstants.CHAR:
+ char ch = (char) buffer.readShort();
+ MarshallingSupport.marshalChar(dataOut, ch);
+ break;
+ case DataConstants.DOUBLE:
+ double doubleVal = Double.longBitsToDouble(buffer.readLong());
+ MarshallingSupport.marshalDouble(dataOut, doubleVal);
+ break;
+ case DataConstants.FLOAT:
+ Float floatVal = Float.intBitsToFloat(buffer.readInt());
+ MarshallingSupport.marshalFloat(dataOut, floatVal);
+ break;
+ case DataConstants.INT:
+ MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+ break;
+ case DataConstants.LONG:
+ MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+ break;
+ case DataConstants.SHORT:
+ MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+ break;
+ case DataConstants.STRING:
+ String string = buffer.readNullableString();
+ if (string == null) {
+ MarshallingSupport.marshalNull(dataOut);
} else {
- amqMsg.setObjectProperty(s.toString(), prop);
+ MarshallingSupport.marshalString(dataOut, string);
}
- }
- } catch (JMSException e) {
- throw new IOException("exception setting property " + s + " : " + prop, e);
+ break;
+ default:
+ //now we stop
+ stop = true;
+ break;
}
+ dataOut.flush();
}
}
+ bytes = bytesOut.toByteArray();
+ return bytes;
+ }
- amqMsg.setCompressed(isCompressed);
- if (bytes != null) {
- ByteSequence content = new ByteSequence(bytes);
- amqMsg.setContent(content);
+ private static byte[] toAMQMessageBytesType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ int n = buffer.readableBytes();
+ byte[] bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ bytes = toAMQMessageCompressedBytesType(bytes);
+ }
+ return bytes;
+ }
+
+ private static byte[] toAMQMessageCompressedBytesType(final byte[] bytes) throws IOException {
+ int length = bytes.length;
+ Deflater deflater = new Deflater();
+ try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ compressed.write(new byte[4]);
+ deflater.setInput(bytes);
+ deflater.finish();
+ byte[] bytesBuf = new byte[1024];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(bytesBuf);
+ compressed.write(bytesBuf, 0, count);
+ }
+ compressed.flush();
+ ByteSequence byteSeq = compressed.toByteSequence();
+ ByteSequenceData.writeIntBig(byteSeq, length);
+ return Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+ } finally {
+ deflater.end();
}
- return amqMsg;
}
+ private static byte[] toAMQMessageDefaultType(final ActiveMQBuffer buffer,
+ final boolean isCompressed) throws IOException {
+ int n = buffer.readableBytes();
+ byte[] bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ out.write(bytes);
+ out.flush();
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ return bytes;
+ }
+
+ private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) {
+ String[] brokers = brokerPath.split(",");
+ BrokerId[] bids = new BrokerId[brokers.length];
+ for (int i = 0; i < bids.length; i++) {
+ bids[i] = new BrokerId(brokers[i]);
+ }
+ amqMsg.setBrokerPath(bids);
+ }
+
+ private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) {
+ String[] cluster = clusterPath.split(",");
+ BrokerId[] bids = new BrokerId[cluster.length];
+ for (int i = 0; i < bids.length; i++) {
+ bids[i] = new BrokerId(cluster[i]);
+ }
+ amqMsg.setCluster(bids);
+ }
+
+ private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] dsBytes) throws IOException {
+ ByteSequence seq = new ByteSequence(dsBytes);
+ DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
+ amqMsg.setDataStructure(ds);
+ }
+
+ private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] origDestBytes) throws IOException {
+ ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
+ amqMsg.setOriginalDestination(origDest);
+ }
+
+ private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] origTxIdBytes) throws IOException {
+ TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
+ amqMsg.setOriginalTransactionId(origTxId);
+ }
+
+ private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg,
+ final WireFormat marshaller,
+ final byte[] replyToBytes) throws IOException {
+ ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
+ amqMsg.setReplyTo(replyTo);
+ }
+
+ private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg,
+ final SimpleString dlqCause) throws IOException {
+ try {
+ amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
+ } catch (JMSException e) {
+ throw new IOException("failure to set dlq property " + dlqCause, e);
+ }
+ }
+
+ private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg,
+ final SimpleString lastValueProperty) throws IOException {
+ try {
+ amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
+ } catch (JMSException e) {
+ throw new IOException("failure to set lvq property " + lastValueProperty, e);
+ }
+ }
+
+ private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
+ final ICoreMessage coreMessage,
+ final Set<SimpleString> props,
+ final AMQConsumer consumer) throws IOException {
+ for (SimpleString s : props) {
+ final String keyStr = s.toString();
+ if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
+ continue;
+ }
+ final Object prop = coreMessage.getObjectProperty(s);
+ try {
+ if (prop instanceof SimpleString) {
+ amqMsg.setObjectProperty(keyStr, prop.toString());
+ } else {
+ if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
+ Long l = (Long) prop;
+ amqMsg.setObjectProperty(keyStr, l.intValue());
+ } else {
+ amqMsg.setObjectProperty(keyStr, prop);
+ }
+ }
+ } catch (JMSException e) {
+ throw new IOException("exception setting property " + s + " : " + prop, e);
+ }
+ }
+ }
}
[09/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Cached Notification Destination check on AMQConsumer to avoid expensive ActiveMQDestination::toString
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9650c80b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9650c80b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9650c80b
Branch: refs/heads/master
Commit: 9650c80ba701acbf727f379bd155551cdd663b47
Parents: 64724c3
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 00:37:24 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../core/protocol/openwire/OpenWireMessageConverter.java | 10 ++++------
.../artemis/core/protocol/openwire/amq/AMQConsumer.java | 9 ++++++++-
2 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9650c80b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 457593d..0948f8a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -97,8 +97,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
- private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
-
private final WireFormat marshaller;
public OpenWireMessageConverter(WireFormat marshaller) {
@@ -516,7 +514,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
public MessageDispatch createMessageDispatch(MessageReference reference,
ICoreMessage message,
AMQConsumer consumer) throws IOException, JMSException {
- ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getOpenwireDestination());
+ ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
//we can use core message id for sequenceId
amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -533,7 +531,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
private ActiveMQMessage toAMQMessage(MessageReference reference,
ICoreMessage coreMessage,
- ActiveMQDestination actualDestination) throws IOException {
+ AMQConsumer consumer) throws IOException {
ActiveMQMessage amqMsg = null;
byte coreType = coreMessage.getType();
switch (coreType) {
@@ -779,7 +777,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
amqMsg.setDataStructure(ds);
}
-
+ final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
Object value = coreMessage.getGroupID();
@@ -869,7 +867,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
for (SimpleString s : props) {
String keyStr = s.toString();
if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
- !(actualDestination.toString().contains(AMQ_NOTIFICATIONS_DESTINATION))) {
+ !consumer.hasNotificationDestination()) {
continue;
}
Object prop = coreMessage.getObjectProperty(s);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9650c80b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 77051cc..bfa51eb 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -53,8 +53,10 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
public class AMQConsumer {
+ private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
private AMQSession session;
- private org.apache.activemq.command.ActiveMQDestination openwireDestination;
+ private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
+ private final boolean hasNotificationDestination;
private ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private ServerConsumer serverConsumer;
@@ -74,6 +76,7 @@ public class AMQConsumer {
boolean internalAddress) {
this.session = amqSession;
this.openwireDestination = d;
+ this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
this.info = info;
this.scheduledPool = scheduledPool;
this.prefetchSize = info.getPrefetchSize();
@@ -331,6 +334,10 @@ public class AMQConsumer {
serverConsumer.close(false);
}
+ public boolean hasNotificationDestination() {
+ return hasNotificationDestination;
+ }
+
public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
return openwireDestination;
}
[05/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Refactored OpenWireMessageConverter::inbound into smaller methods
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54d01618
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54d01618
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54d01618
Branch: refs/heads/master
Commit: 54d0161850bc4543839d4f36b46f3bd84f5cd8e1
Parents: 2db4eaf
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 17:01:54 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 474 +++++++++++--------
1 file changed, 280 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54d01618/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 83ff6d6..54f3c99 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -123,10 +123,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
// @Override
public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
- Message messageSend = (Message) message;
- CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
+ final Message messageSend = (Message) message;
+ final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
- String type = messageSend.getType();
+ final String type = messageSend.getType();
if (type != null) {
coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
}
@@ -135,264 +135,350 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.setPriority(messageSend.getPriority());
coreMessage.setTimestamp(messageSend.getTimestamp());
- byte coreType = toCoreType(messageSend.getDataStructureType());
+ final byte coreType = toCoreType(messageSend.getDataStructureType());
coreMessage.setType(coreType);
- ActiveMQBuffer body = coreMessage.getBodyBuffer();
+ final ActiveMQBuffer body = coreMessage.getBodyBuffer();
- ByteSequence contents = messageSend.getContent();
+ final ByteSequence contents = messageSend.getContent();
if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
body.writeNullableString(null);
} else if (contents != null) {
- boolean messageCompressed = messageSend.isCompressed();
+ final boolean messageCompressed = messageSend.isCompressed();
if (messageCompressed) {
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
}
switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
- InputStream tis = new ByteArrayInputStream(contents);
- if (messageCompressed) {
- tis = new InflaterInputStream(tis);
- }
- DataInputStream tdataIn = new DataInputStream(tis);
- String text = MarshallingSupport.readUTF8(tdataIn);
- tdataIn.close();
- body.writeNullableSimpleString(new SimpleString(text));
+ writeTextType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
- InputStream mis = new ByteArrayInputStream(contents);
- if (messageCompressed) {
- mis = new InflaterInputStream(mis);
- }
- DataInputStream mdataIn = new DataInputStream(mis);
- Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
- mdataIn.close();
- TypedProperties props = new TypedProperties();
- loadMapIntoProperties(props, map);
- props.encode(body.byteBuf());
+ writeMapType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
- if (messageCompressed) {
- try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
- org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- byte[] buf = new byte[1024];
- int n = ois.read(buf);
- while (n != -1) {
- decompressed.write(buf, 0, n);
- n = ois.read();
- }
- //read done
- contents = decompressed.toByteSequence();
- }
- }
- body.writeInt(contents.length);
- body.writeBytes(contents.data, contents.offset, contents.length);
+ writeObjectType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
- InputStream sis = new ByteArrayInputStream(contents);
- if (messageCompressed) {
- sis = new InflaterInputStream(sis);
- }
- DataInputStream sdis = new DataInputStream(sis);
- int stype = sdis.read();
- while (stype != -1) {
- switch (stype) {
- case MarshallingSupport.BOOLEAN_TYPE:
- body.writeByte(DataConstants.BOOLEAN);
- body.writeBoolean(sdis.readBoolean());
- break;
- case MarshallingSupport.BYTE_TYPE:
- body.writeByte(DataConstants.BYTE);
- body.writeByte(sdis.readByte());
- break;
- case MarshallingSupport.BYTE_ARRAY_TYPE:
- body.writeByte(DataConstants.BYTES);
- int slen = sdis.readInt();
- byte[] sbytes = new byte[slen];
- sdis.read(sbytes);
- body.writeInt(slen);
- body.writeBytes(sbytes);
- break;
- case MarshallingSupport.CHAR_TYPE:
- body.writeByte(DataConstants.CHAR);
- char schar = sdis.readChar();
- body.writeShort((short) schar);
- break;
- case MarshallingSupport.DOUBLE_TYPE:
- body.writeByte(DataConstants.DOUBLE);
- double sdouble = sdis.readDouble();
- body.writeLong(Double.doubleToLongBits(sdouble));
- break;
- case MarshallingSupport.FLOAT_TYPE:
- body.writeByte(DataConstants.FLOAT);
- float sfloat = sdis.readFloat();
- body.writeInt(Float.floatToIntBits(sfloat));
- break;
- case MarshallingSupport.INTEGER_TYPE:
- body.writeByte(DataConstants.INT);
- body.writeInt(sdis.readInt());
- break;
- case MarshallingSupport.LONG_TYPE:
- body.writeByte(DataConstants.LONG);
- body.writeLong(sdis.readLong());
- break;
- case MarshallingSupport.SHORT_TYPE:
- body.writeByte(DataConstants.SHORT);
- body.writeShort(sdis.readShort());
- break;
- case MarshallingSupport.STRING_TYPE:
- body.writeByte(DataConstants.STRING);
- String sstring = sdis.readUTF();
- body.writeNullableString(sstring);
- break;
- case MarshallingSupport.BIG_STRING_TYPE:
- body.writeByte(DataConstants.STRING);
- String sbigString = MarshallingSupport.readUTF8(sdis);
- body.writeNullableString(sbigString);
- break;
- case MarshallingSupport.NULL:
- body.writeByte(DataConstants.STRING);
- body.writeNullableString(null);
- break;
- default:
- //something we don't know, ignore
- break;
- }
- stype = sdis.read();
- }
- sdis.close();
+ writeStreamType(contents, messageCompressed, body);
break;
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
- if (messageCompressed) {
- Inflater inflater = new Inflater();
- try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- int length = ByteSequenceData.readIntBig(contents);
- contents.offset = 0;
- byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
-
- inflater.setInput(data);
- byte[] buffer = new byte[length];
- int count = inflater.inflate(buffer);
- decompressed.write(buffer, 0, count);
- contents = decompressed.toByteSequence();
- } catch (Exception e) {
- throw new IOException(e);
- } finally {
- inflater.end();
- }
- }
- body.writeBytes(contents.data, contents.offset, contents.length);
+ writeBytesType(contents, messageCompressed, body);
break;
default:
- if (messageCompressed) {
- try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
- OutputStream os = new InflaterOutputStream(decompressed)) {
- os.write(contents.data, contents.offset, contents.getLength());
- contents = decompressed.toByteSequence();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- body.writeBytes(contents.data, contents.offset, contents.length);
+ writeDefaultType(contents, messageCompressed, body);
break;
}
}
//amq specific
coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
- BrokerId[] brokers = messageSend.getBrokerPath();
+ final BrokerId[] brokers = messageSend.getBrokerPath();
if (brokers != null) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < brokers.length; i++) {
- builder.append(brokers[i].getValue());
- if (i != (brokers.length - 1)) {
- builder.append(","); //is this separator safe?
- }
- }
- coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+ putMsgBrokerPath(brokers, coreMessage);
}
- BrokerId[] cluster = messageSend.getCluster();
+ final BrokerId[] cluster = messageSend.getCluster();
if (cluster != null) {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < cluster.length; i++) {
- builder.append(cluster[i].getValue());
- if (i != (cluster.length - 1)) {
- builder.append(","); //is this separator safe?
- }
- }
- coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+ putMsgCluster(cluster, coreMessage);
}
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
- String corrId = messageSend.getCorrelationId();
+ final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
coreMessage.putStringProperty("JMSCorrelationID", corrId);
}
- DataStructure ds = messageSend.getDataStructure();
+ final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
- ByteSequence dsBytes = marshaller.marshal(ds);
- dsBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+ putMsgDataStructure(ds, marshaller, coreMessage);
}
- String groupId = messageSend.getGroupID();
+ final String groupId = messageSend.getGroupID();
if (groupId != null) {
coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
}
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
- MessageId messageId = messageSend.getMessageId();
+ final MessageId messageId = messageSend.getMessageId();
- ByteSequence midBytes = marshaller.marshal(messageId);
+ final ByteSequence midBytes = marshaller.marshal(messageId);
midBytes.compact();
coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
- ProducerId producerId = messageSend.getProducerId();
+ final ProducerId producerId = messageSend.getProducerId();
if (producerId != null) {
- ByteSequence producerIdBytes = marshaller.marshal(producerId);
- producerIdBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+ putMsgProducerId(producerId, marshaller, coreMessage);
}
- ByteSequence propBytes = messageSend.getMarshalledProperties();
+ final ByteSequence propBytes = messageSend.getMarshalledProperties();
if (propBytes != null) {
- propBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
- //unmarshall properties to core so selector will work
- Map<String, Object> props = messageSend.getProperties();
- //Map<String, Object> props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes)));
- for (Entry<String, Object> ent : props.entrySet()) {
- Object value = ent.getValue();
- try {
- coreMessage.putObjectProperty(ent.getKey(), value);
- } catch (ActiveMQPropertyConversionException e) {
- coreMessage.putStringProperty(ent.getKey(), value.toString());
- }
- }
+ putMsgMarshalledProperties(propBytes, messageSend, coreMessage);
}
- ActiveMQDestination replyTo = messageSend.getReplyTo();
+ final ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
- ByteSequence replyToBytes = marshaller.marshal(replyTo);
- replyToBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+ putMsgReplyTo(replyTo, marshaller, coreMessage);
}
- String userId = messageSend.getUserID();
+ final String userId = messageSend.getUserID();
if (userId != null) {
coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
- ActiveMQDestination origDest = messageSend.getOriginalDestination();
+ final ActiveMQDestination origDest = messageSend.getOriginalDestination();
if (origDest != null) {
- ByteSequence origDestBytes = marshaller.marshal(origDest);
- origDestBytes.compact();
- coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+ putMsgOriginalDestination(origDest, marshaller, coreMessage);
}
return coreMessage;
}
+ private static void writeTextType(final ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ InputStream tis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ tis = new InflaterInputStream(tis);
+ }
+ DataInputStream tdataIn = new DataInputStream(tis);
+ String text = MarshallingSupport.readUTF8(tdataIn);
+ tdataIn.close();
+ body.writeNullableSimpleString(new SimpleString(text));
+ }
+
+ private static void writeMapType(final ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ InputStream mis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ mis = new InflaterInputStream(mis);
+ }
+ DataInputStream mdataIn = new DataInputStream(mis);
+ Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
+ mdataIn.close();
+ TypedProperties props = new TypedProperties();
+ loadMapIntoProperties(props, map);
+ props.encode(body.byteBuf());
+ }
+
+ private static void writeObjectType(ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ if (messageCompressed) {
+ contents = writeCompressedObjectType(contents);
+ }
+ body.writeInt(contents.length);
+ body.writeBytes(contents.data, contents.offset, contents.length);
+ }
+
+ private static ByteSequence writeCompressedObjectType(final ByteSequence contents) throws IOException {
+ try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
+ org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ byte[] buf = new byte[1024];
+ int n = ois.read(buf);
+ while (n != -1) {
+ decompressed.write(buf, 0, n);
+ n = ois.read();
+ }
+ //read done
+ return decompressed.toByteSequence();
+ }
+ }
+
+ private static void writeStreamType(final ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ InputStream sis = new ByteArrayInputStream(contents);
+ if (messageCompressed) {
+ sis = new InflaterInputStream(sis);
+ }
+ DataInputStream sdis = new DataInputStream(sis);
+ int stype = sdis.read();
+ while (stype != -1) {
+ switch (stype) {
+ case MarshallingSupport.BOOLEAN_TYPE:
+ body.writeByte(DataConstants.BOOLEAN);
+ body.writeBoolean(sdis.readBoolean());
+ break;
+ case MarshallingSupport.BYTE_TYPE:
+ body.writeByte(DataConstants.BYTE);
+ body.writeByte(sdis.readByte());
+ break;
+ case MarshallingSupport.BYTE_ARRAY_TYPE:
+ body.writeByte(DataConstants.BYTES);
+ int slen = sdis.readInt();
+ byte[] sbytes = new byte[slen];
+ sdis.read(sbytes);
+ body.writeInt(slen);
+ body.writeBytes(sbytes);
+ break;
+ case MarshallingSupport.CHAR_TYPE:
+ body.writeByte(DataConstants.CHAR);
+ char schar = sdis.readChar();
+ body.writeShort((short) schar);
+ break;
+ case MarshallingSupport.DOUBLE_TYPE:
+ body.writeByte(DataConstants.DOUBLE);
+ double sdouble = sdis.readDouble();
+ body.writeLong(Double.doubleToLongBits(sdouble));
+ break;
+ case MarshallingSupport.FLOAT_TYPE:
+ body.writeByte(DataConstants.FLOAT);
+ float sfloat = sdis.readFloat();
+ body.writeInt(Float.floatToIntBits(sfloat));
+ break;
+ case MarshallingSupport.INTEGER_TYPE:
+ body.writeByte(DataConstants.INT);
+ body.writeInt(sdis.readInt());
+ break;
+ case MarshallingSupport.LONG_TYPE:
+ body.writeByte(DataConstants.LONG);
+ body.writeLong(sdis.readLong());
+ break;
+ case MarshallingSupport.SHORT_TYPE:
+ body.writeByte(DataConstants.SHORT);
+ body.writeShort(sdis.readShort());
+ break;
+ case MarshallingSupport.STRING_TYPE:
+ body.writeByte(DataConstants.STRING);
+ String sstring = sdis.readUTF();
+ body.writeNullableString(sstring);
+ break;
+ case MarshallingSupport.BIG_STRING_TYPE:
+ body.writeByte(DataConstants.STRING);
+ String sbigString = MarshallingSupport.readUTF8(sdis);
+ body.writeNullableString(sbigString);
+ break;
+ case MarshallingSupport.NULL:
+ body.writeByte(DataConstants.STRING);
+ body.writeNullableString(null);
+ break;
+ default:
+ //something we don't know, ignore
+ break;
+ }
+ stype = sdis.read();
+ }
+ sdis.close();
+ }
+
+ private static void writeBytesType(ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ if (messageCompressed) {
+ contents = writeCompressedBytesType(contents);
+ }
+ body.writeBytes(contents.data, contents.offset, contents.length);
+ }
+
+ private static ByteSequence writeCompressedBytesType(final ByteSequence contents) throws IOException {
+ Inflater inflater = new Inflater();
+ try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ int length = ByteSequenceData.readIntBig(contents);
+ contents.offset = 0;
+ byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
+
+ inflater.setInput(data);
+ byte[] buffer = new byte[length];
+ int count = inflater.inflate(buffer);
+ decompressed.write(buffer, 0, count);
+ return decompressed.toByteSequence();
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ inflater.end();
+ }
+ }
+
+ private static void writeDefaultType(ByteSequence contents,
+ final boolean messageCompressed,
+ final ActiveMQBuffer body) throws IOException {
+ if (messageCompressed) {
+ contents = writeCompressedDefaultType(contents);
+ }
+ body.writeBytes(contents.data, contents.offset, contents.length);
+ }
+
+ private static ByteSequence writeCompressedDefaultType(final ByteSequence contents) throws IOException {
+ try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream os = new InflaterOutputStream(decompressed)) {
+ os.write(contents.data, contents.offset, contents.getLength());
+ return decompressed.toByteSequence();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ private static void putMsgBrokerPath(final BrokerId[] brokers, final CoreMessage coreMessage) {
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0, size = brokers.length; i < size; i++) {
+ builder.append(brokers[i].getValue());
+ if (i != (size - 1)) {
+ builder.append(','); //is this separator safe?
+ }
+ }
+ coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+ }
+
+ private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
+ final StringBuilder builder = new StringBuilder();
+ for (int i = 0, size = cluster.length; i < size; i++) {
+ builder.append(cluster[i].getValue());
+ if (i != (size - 1)) {
+ builder.append(','); //is this separator safe?
+ }
+ }
+ coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+ }
+
+ private static void putMsgDataStructure(final DataStructure ds,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence dsBytes = marshaller.marshal(ds);
+ dsBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+ }
+
+ private static void putMsgProducerId(final ProducerId producerId,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+ producerIdBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+ }
+
+ private static void putMsgMarshalledProperties(final ByteSequence propBytes,
+ final Message messageSend,
+ final CoreMessage coreMessage) throws IOException {
+ propBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
+ //unmarshall properties to core so selector will work
+ final Map<String, Object> props = messageSend.getProperties();
+ if (!props.isEmpty()) {
+ props.forEach((key, value) -> {
+ try {
+ coreMessage.putObjectProperty(key, value);
+ } catch (ActiveMQPropertyConversionException e) {
+ coreMessage.putStringProperty(key, value.toString());
+ }
+ });
+ }
+ }
+
+ private static void putMsgReplyTo(final ActiveMQDestination replyTo,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence replyToBytes = marshaller.marshal(replyTo);
+ replyToBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+ }
+
+ private static void putMsgOriginalDestination(final ActiveMQDestination origDest,
+ final WireFormat marshaller,
+ final CoreMessage coreMessage) throws IOException {
+ final ByteSequence origDestBytes = marshaller.marshal(origDest);
+ origDestBytes.compact();
+ coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+ }
+
private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
for (Entry<String, Object> entry : map.entrySet()) {
SimpleString key = new SimpleString(entry.getKey());
[06/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire
improvements
Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements
Used SimpleString on OpenWireMessageConverter to avoid translations on CoreMessage
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64724c35
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64724c35
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64724c35
Branch: refs/heads/master
Commit: 64724c3520586c6cd1bc0aec9942ae5bb5562459
Parents: 17c0a33
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 22:24:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500
----------------------------------------------------------------------
.../openwire/OpenWireMessageConverter.java | 331 ++++++++++---------
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
2 files changed, 167 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 54f3c99..457593d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -71,29 +71,31 @@ import org.fusesource.hawtbuf.UTF8Buffer;
public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
- public static final String AMQ_PREFIX = "__HDR_";
- public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
-
- private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL";
- private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME";
-
- private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH";
- private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
- private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
- private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
- private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString();
- private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
- private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
- private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION";
- private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
- private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
- private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
- private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
-
- private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
-
- private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
- private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
+ private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
+ private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
+ private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_");
+ public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause");
+
+ private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL");
+ private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME");
+
+ private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH");
+ private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
+ private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
+ private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
+ private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
+ private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
+ private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
+ private static final SimpleString AMQ_MSG_ORIG_DESTINATION = new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
+ private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
+ private static final SimpleString AMQ_MSG_PRODUCER_ID = new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
+ private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP");
+ private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
+
+ private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
+
+ private static final SimpleString AMQ_MSG_DROPPABLE = new SimpleString(AMQ_PREFIX + "DROPPABLE");
+ private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
@@ -128,7 +130,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final String type = messageSend.getType();
if (type != null) {
- coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
+ coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type));
}
coreMessage.setDurable(messageSend.isPersistent());
coreMessage.setExpiration(messageSend.getExpiration());
@@ -185,7 +187,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
final String corrId = messageSend.getCorrelationId();
if (corrId != null) {
- coreMessage.putStringProperty("JMSCorrelationID", corrId);
+ coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
}
final DataStructure ds = messageSend.getDataStructure();
if (ds != null) {
@@ -193,7 +195,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
final String groupId = messageSend.getGroupID();
if (groupId != null) {
- coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
+ coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId));
}
coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
@@ -219,7 +221,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
final String userId = messageSend.getUserID();
if (userId != null) {
- coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
+ coreMessage.putStringProperty(AMQ_MSG_USER_ID, new SimpleString(userId));
}
coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
@@ -415,7 +417,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
builder.append(','); //is this separator safe?
}
}
- coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+ coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString()));
}
private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
@@ -426,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
builder.append(','); //is this separator safe?
}
}
- coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+ coreMessage.putStringProperty(AMQ_MSG_CLUSTER, new SimpleString(builder.toString()));
}
private static void putMsgDataStructure(final DataStructure ds,
@@ -557,7 +559,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
}
- String type = coreMessage.getStringProperty(new SimpleString("JMSType"));
+ String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
if (type != null) {
amqMsg.setJMSType(type);
}
@@ -580,156 +582,155 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
byte[] bytes = null;
if (buffer != null) {
buffer.resetReaderIndex();
- synchronized (buffer) {
- if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
- OutputStream out = bytesOut;
- if (isCompressed) {
- out = new DeflaterOutputStream(out, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(out)) {
- MarshallingSupport.writeUTF8(dataOut, text.toString());
- dataOut.flush();
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
- TypedProperties mapData = new TypedProperties();
- //it could be a null map
- if (buffer.readableBytes() > 0) {
- mapData.decode(buffer.byteBuf());
- Map<String, Object> map = mapData.getMap();
- ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
- OutputStream os = out;
- if (isCompressed) {
- os = new DeflaterOutputStream(os, true);
- }
- try (DataOutputStream dataOut = new DataOutputStream(os)) {
- MarshallingSupport.marshalPrimitiveMap(map, dataOut);
- dataOut.flush();
- }
- bytes = out.toByteArray();
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
- if (buffer.readableBytes() > 0) {
- int len = buffer.readInt();
- bytes = new byte[len];
- buffer.readBytes(bytes);
- if (isCompressed) {
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
- out.write(bytes);
- out.flush();
- }
- bytes = bytesOut.toByteArray();
- }
- }
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
- org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+ if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
OutputStream out = bytesOut;
if (isCompressed) {
- out = new DeflaterOutputStream(bytesOut, true);
+ out = new DeflaterOutputStream(out, true);
}
try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
- boolean stop = false;
- while (!stop && buffer.readable()) {
- byte primitiveType = buffer.readByte();
- switch (primitiveType) {
- case DataConstants.BOOLEAN:
- MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
- break;
- case DataConstants.BYTE:
- MarshallingSupport.marshalByte(dataOut, buffer.readByte());
- break;
- case DataConstants.BYTES:
- int len = buffer.readInt();
- byte[] bytesData = new byte[len];
- buffer.readBytes(bytesData);
- MarshallingSupport.marshalByteArray(dataOut, bytesData);
- break;
- case DataConstants.CHAR:
- char ch = (char) buffer.readShort();
- MarshallingSupport.marshalChar(dataOut, ch);
- break;
- case DataConstants.DOUBLE:
- double doubleVal = Double.longBitsToDouble(buffer.readLong());
- MarshallingSupport.marshalDouble(dataOut, doubleVal);
- break;
- case DataConstants.FLOAT:
- Float floatVal = Float.intBitsToFloat(buffer.readInt());
- MarshallingSupport.marshalFloat(dataOut, floatVal);
- break;
- case DataConstants.INT:
- MarshallingSupport.marshalInt(dataOut, buffer.readInt());
- break;
- case DataConstants.LONG:
- MarshallingSupport.marshalLong(dataOut, buffer.readLong());
- break;
- case DataConstants.SHORT:
- MarshallingSupport.marshalShort(dataOut, buffer.readShort());
- break;
- case DataConstants.STRING:
- String string = buffer.readNullableString();
- if (string == null) {
- MarshallingSupport.marshalNull(dataOut);
- } else {
- MarshallingSupport.marshalString(dataOut, string);
- }
- break;
- default:
- //now we stop
- stop = true;
- break;
- }
- dataOut.flush();
- }
+ MarshallingSupport.writeUTF8(dataOut, text.toString());
+ dataOut.flush();
+ bytes = bytesOut.toByteArray();
}
- bytes = bytesOut.toByteArray();
- } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
- int n = buffer.readableBytes();
- bytes = new byte[n];
- buffer.readBytes(bytes);
+ }
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
+ TypedProperties mapData = new TypedProperties();
+ //it could be a null map
+ if (buffer.readableBytes() > 0) {
+ mapData.decode(buffer.byteBuf());
+ Map<String, Object> map = mapData.getMap();
+ ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+ OutputStream os = out;
if (isCompressed) {
- int length = bytes.length;
- Deflater deflater = new Deflater();
- try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
- compressed.write(new byte[4]);
- deflater.setInput(bytes);
- deflater.finish();
- byte[] bytesBuf = new byte[1024];
- while (!deflater.finished()) {
- int count = deflater.deflate(bytesBuf);
- compressed.write(bytesBuf, 0, count);
- }
- compressed.flush();
- ByteSequence byteSeq = compressed.toByteSequence();
- ByteSequenceData.writeIntBig(byteSeq, length);
- bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
- } finally {
- deflater.end();
- }
+ os = new DeflaterOutputStream(os, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(os)) {
+ MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+ dataOut.flush();
}
- } else {
- int n = buffer.readableBytes();
- bytes = new byte[n];
+ bytes = out.toByteArray();
+ }
+
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
+ if (buffer.readableBytes() > 0) {
+ int len = buffer.readInt();
+ bytes = new byte[len];
buffer.readBytes(bytes);
if (isCompressed) {
- try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
out.write(bytes);
out.flush();
- bytes = bytesOut.toByteArray();
}
+ bytes = bytesOut.toByteArray();
+ }
+ }
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
+ org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+ OutputStream out = bytesOut;
+ if (isCompressed) {
+ out = new DeflaterOutputStream(bytesOut, true);
+ }
+ try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+ boolean stop = false;
+ while (!stop && buffer.readable()) {
+ byte primitiveType = buffer.readByte();
+ switch (primitiveType) {
+ case DataConstants.BOOLEAN:
+ MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+ break;
+ case DataConstants.BYTE:
+ MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+ break;
+ case DataConstants.BYTES:
+ int len = buffer.readInt();
+ byte[] bytesData = new byte[len];
+ buffer.readBytes(bytesData);
+ MarshallingSupport.marshalByteArray(dataOut, bytesData);
+ break;
+ case DataConstants.CHAR:
+ char ch = (char) buffer.readShort();
+ MarshallingSupport.marshalChar(dataOut, ch);
+ break;
+ case DataConstants.DOUBLE:
+ double doubleVal = Double.longBitsToDouble(buffer.readLong());
+ MarshallingSupport.marshalDouble(dataOut, doubleVal);
+ break;
+ case DataConstants.FLOAT:
+ Float floatVal = Float.intBitsToFloat(buffer.readInt());
+ MarshallingSupport.marshalFloat(dataOut, floatVal);
+ break;
+ case DataConstants.INT:
+ MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+ break;
+ case DataConstants.LONG:
+ MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+ break;
+ case DataConstants.SHORT:
+ MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+ break;
+ case DataConstants.STRING:
+ String string = buffer.readNullableString();
+ if (string == null) {
+ MarshallingSupport.marshalNull(dataOut);
+ } else {
+ MarshallingSupport.marshalString(dataOut, string);
+ }
+ break;
+ default:
+ //now we stop
+ stop = true;
+ break;
+ }
+ dataOut.flush();
+ }
+ }
+ bytes = bytesOut.toByteArray();
+ } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
+ int n = buffer.readableBytes();
+ bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ int length = bytes.length;
+ Deflater deflater = new Deflater();
+ try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+ compressed.write(new byte[4]);
+ deflater.setInput(bytes);
+ deflater.finish();
+ byte[] bytesBuf = new byte[1024];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(bytesBuf);
+ compressed.write(bytesBuf, 0, count);
+ }
+ compressed.flush();
+ ByteSequence byteSeq = compressed.toByteSequence();
+ ByteSequenceData.writeIntBig(byteSeq, length);
+ bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+ } finally {
+ deflater.end();
+ }
+ }
+ } else {
+ int n = buffer.readableBytes();
+ bytes = new byte[n];
+ buffer.readBytes(bytes);
+ if (isCompressed) {
+ try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+ out.write(bytes);
+ out.flush();
+ bytes = bytesOut.toByteArray();
}
}
-
- buffer.resetReaderIndex();// this is important for topics as the buffer
- // may be read multiple times
}
+
+ buffer.resetReaderIndex();// this is important for topics as the buffer
+ // may be read multiple times
}
//we need check null because messages may come from other clients
@@ -767,7 +768,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
amqMsg.setCommandId(commandId);
- SimpleString corrId = (SimpleString) coreMessage.getObjectProperty("JMSCorrelationID");
+ SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString());
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 57865b7..77051cc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -286,7 +286,7 @@ public class AMQConsumer {
for (MessageReference ref : ackList) {
Throwable poisonCause = ack.getPoisonCause();
if (poisonCause != null) {
- ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
+ ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
}
ref.getQueue().sendToDeadLetterAddress(transaction, ref);
}