You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/01/22 23:06:05 UTC

[01/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 04a9884d3 -> b74018192


ARTEMIS-1616 OpenWire improvements

Refactored OpenWireMessageConverter::inbound into a private static method


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0387b1a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0387b1a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0387b1a8

Branch: refs/heads/master
Commit: 0387b1a8427673f42166df5651bc504d38f365ce
Parents: c6b6dd9
Author: Francesco Nigro <ni...@gmail.com>
Authored: Mon Jan 22 19:57:29 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../core/protocol/openwire/OpenWireMessageConverter.java  | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0387b1a8/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 3dc4a4e..19dc802 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -120,10 +120,14 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       return null;
    }
 
-   //   @Override
-   public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+   public org.apache.activemq.artemis.api.core.Message inbound(Message message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
+      return inbound(message, marshaller, coreMessageObjectPools);
+   }
+
+   private static org.apache.activemq.artemis.api.core.Message inbound(final Message messageSend,
+                                                                       final WireFormat marshaller,
+                                                                       final CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
-      final Message messageSend = (Message) message;
       final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
 
       final String type = messageSend.getType();


[03/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Added existing queues cache to avoid multiple expensive AMQSession::checkAutoCreateQueue calls


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/17c0a331
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/17c0a331
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/17c0a331

Branch: refs/heads/master
Commit: 17c0a331aceda16460cc4d87f376c8730e479fe5
Parents: 051a3ca
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 21:27:22 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQSession.java  | 37 ++++++++++++++++++--
 1 file changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/17c0a331/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 3ff3ae1..ad15fe7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -89,6 +89,8 @@ public class AMQSession implements SessionCallback {
 
    private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
+   private String[] existingQueuesCache;
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -105,6 +107,7 @@ public class AMQSession implements SessionCallback {
 
       this.converter = new OpenWireMessageConverter(marshaller.copy());
       this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
+      this.existingQueuesCache = null;
    }
 
    public boolean isClosed() {
@@ -193,6 +196,33 @@ public class AMQSession implements SessionCallback {
       return consumersList;
    }
 
+   private boolean checkCachedExistingQueues(final SimpleString address,
+                                             final String physicalName,
+                                             final boolean isTemporary) throws Exception {
+      String[] existingQueuesCache = this.existingQueuesCache;
+      //lazy allocation of the cache
+      if (existingQueuesCache == null) {
+         //16 means 64 bytes with 32 bit references or 128 bytes with 64 bit references -> 1 or 2 cache lines with common archs
+         existingQueuesCache = new String[16];
+         assert (Integer.bitCount(existingQueuesCache.length) == 1) : "existingQueuesCache.length must be power of 2";
+         this.existingQueuesCache = existingQueuesCache;
+      }
+      final int hashCode = physicalName.hashCode();
+      //this.existingQueuesCache.length must be power of 2
+      final int mask = existingQueuesCache.length - 1;
+      final int index = hashCode & mask;
+      final String existingQueue = existingQueuesCache[index];
+      if (existingQueue != null && existingQueue.equals(physicalName)) {
+         //if the information is stale (ie no longer valid) it will fail later
+         return true;
+      }
+      final boolean hasQueue = checkAutoCreateQueue(address, isTemporary);
+      if (hasQueue) {
+         existingQueuesCache[index] = physicalName;
+      }
+      return hasQueue;
+   }
+
    private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
       boolean hasQueue = true;
       if (!connection.containsKnownDestination(queueName)) {
@@ -350,7 +380,7 @@ public class AMQSession implements SessionCallback {
          originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
       }
 
-      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
+      final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
 
       final AtomicInteger count = new AtomicInteger(actualDestinations.length);
 
@@ -361,13 +391,14 @@ public class AMQSession implements SessionCallback {
 
       for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
          final ActiveMQDestination dest = actualDestinations[i];
-         final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
+         final String physicalName = dest.getPhysicalName();
+         final SimpleString address = SimpleString.toSimpleString(physicalName, coreMessageObjectPools.getAddressStringSimpleStringPool());
          //the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
          final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
          coreMsg.setAddress(address);
 
          if (dest.isQueue()) {
-            checkAutoCreateQueue(address, dest.isTemporary());
+            checkCachedExistingQueues(address, physicalName, dest.isTemporary());
             coreMsg.setRoutingType(RoutingType.ANYCAST);
          } else {
             coreMsg.setRoutingType(RoutingType.MULTICAST);


[10/10] activemq-artemis git commit: This closes #1786

Posted by cl...@apache.org.
This closes #1786


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b7401819
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b7401819
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b7401819

Branch: refs/heads/master
Commit: b7401819295e9ff32fb023beb22087afbdbaa4e9
Parents: 04a9884 0387b1a
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jan 22 18:02:04 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:04 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java |   65 ++
 .../openwire/OpenWireMessageConverter.java      | 1083 ++++++++++--------
 .../core/protocol/openwire/amq/AMQConsumer.java |   11 +-
 .../core/protocol/openwire/amq/AMQSession.java  |   70 +-
 4 files changed, 748 insertions(+), 481 deletions(-)
----------------------------------------------------------------------



[07/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Used SimpleString on AMQSession with HDR_DUPLICATE_DETECTION_ID and CONNECTION_ID_PROPERTY_NAME


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e7a1dca7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e7a1dca7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e7a1dca7

Branch: refs/heads/master
Commit: e7a1dca7b54bc47002b5d743115c1c790ead9579
Parents: 9650c80
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 08:29:28 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/core/protocol/openwire/amq/AMQSession.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e7a1dca7/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index ad15fe7..d6fe390 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -370,14 +370,14 @@ public class AMQSession implements SessionCallback {
 
       final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
 
-      originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
+      originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME, SimpleString.toSimpleString(this.connection.getState().getInfo().getClientId()));
 
       /* ActiveMQ failover transport will attempt to reconnect after connection failure.  Any sent messages that did
       * not receive acks will be resent.  (ActiveMQ broker handles this by returning a last sequence id received to
       * the client).  To handle this in Artemis we use a duplicate ID cache.  To do this we check to see if the
       * message comes from failover connection.  If so we add a DUPLICATE_ID to handle duplicates after a resend. */
       if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString())) {
-         originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+         originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString.toSimpleString(messageSend.getMessageId().toString()));
       }
 
       final boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();


[04/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Avoided copy of CoreMessage when not needed and cached lambda on hot path


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2db4eafc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2db4eafc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2db4eafc

Branch: refs/heads/master
Commit: 2db4eafc4d81cd18f9394e356ee11b202f5f2301
Parents: 04a9884
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 14:24:32 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQSession.java  | 33 +++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2db4eafc/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index d284d6c..3ff3ae1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -85,6 +85,8 @@ public class AMQSession implements SessionCallback {
 
    private final OpenWireProtocolManager protocolManager;
 
+   private final Runnable enableAutoReadAndTtl;
+
    private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
 
    public AMQSession(ConnectionInfo connInfo,
@@ -102,6 +104,7 @@ public class AMQSession implements SessionCallback {
       OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
 
       this.converter = new OpenWireMessageConverter(marshaller.copy());
+      this.enableAutoReadAndTtl = this::enableAutoReadAndTtl;
    }
 
    public boolean isClosed() {
@@ -325,7 +328,7 @@ public class AMQSession implements SessionCallback {
                     boolean sendProducerAck) throws Exception {
       messageSend.setBrokerInTime(System.currentTimeMillis());
 
-      ActiveMQDestination destination = messageSend.getDestination();
+      final ActiveMQDestination destination = messageSend.getDestination();
 
       ActiveMQDestination[] actualDestinations = null;
       if (destination.isComposite()) {
@@ -335,7 +338,7 @@ public class AMQSession implements SessionCallback {
          actualDestinations = new ActiveMQDestination[]{destination};
       }
 
-      org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
+      final org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools);
 
       originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId());
 
@@ -356,14 +359,15 @@ public class AMQSession implements SessionCallback {
          connection.getContext().setDontSendReponse(true);
       }
 
-      for (int i = 0; i < actualDestinations.length; i++) {
-         ActiveMQDestination dest = actualDestinations[i];
-         SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
-         org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy();
+      for (int i = 0, actualDestinationsCount = actualDestinations.length; i < actualDestinationsCount; i++) {
+         final ActiveMQDestination dest = actualDestinations[i];
+         final SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool());
+         //the last coreMsg could be directly the original one -> it avoid 1 copy if actualDestinations > 1 and ANY copy if actualDestinations == 1
+         final org.apache.activemq.artemis.api.core.Message coreMsg = (i == actualDestinationsCount - 1) ? originalCoreMsg : originalCoreMsg.copy();
          coreMsg.setAddress(address);
 
-         if (actualDestinations[i].isQueue()) {
-            checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary());
+         if (dest.isQueue()) {
+            checkAutoCreateQueue(address, dest.isTemporary());
             coreMsg.setRoutingType(RoutingType.ANYCAST);
          } else {
             coreMsg.setRoutingType(RoutingType.MULTICAST);
@@ -424,12 +428,8 @@ public class AMQSession implements SessionCallback {
             //non-persistent messages goes here, by default we stop reading from
             //transport
             connection.getTransportConnection().setAutoRead(false);
-            if (!store.checkMemory(() -> {
-               connection.getTransportConnection().setAutoRead(true);
-               connection.enableTtl();
-            })) {
-               connection.getTransportConnection().setAutoRead(true);
-               connection.enableTtl();
+            if (!store.checkMemory(enableAutoReadAndTtl)) {
+               enableAutoReadAndTtl();
                throw new ResourceAllocationException("Queue is full " + address);
             }
 
@@ -448,6 +448,11 @@ public class AMQSession implements SessionCallback {
       }
    }
 
+   private void enableAutoReadAndTtl() {
+      connection.getTransportConnection().setAutoRead(true);
+      connection.enableTtl();
+   }
+
    public String convertWildcard(String physicalName) {
       return OPENWIRE_WILDCARD.convert(physicalName, server.getConfiguration().getWildcardConfiguration());
    }


[08/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Optimized SimpleString::split because heavily used into AddressImpl::new


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/051a3cae
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/051a3cae
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/051a3cae

Branch: refs/heads/master
Commit: 051a3cae49c488314b9a5ccaa36056a358e4b9df
Parents: 54d0161
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 18:40:06 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/SimpleString.java | 65 ++++++++++++++++++++
 1 file changed, 65 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/051a3cae/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index dbf7468..b3af64a 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.api.core;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
@@ -34,6 +35,7 @@ import org.apache.activemq.artemis.utils.DataConstants;
  */
 public final class SimpleString implements CharSequence, Serializable, Comparable<SimpleString> {
 
+   private static final SimpleString EMPTY = new SimpleString("");
    private static final long serialVersionUID = 4204223851422244307L;
 
    // Attributes
@@ -323,6 +325,14 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
     * @return An array of SimpleStrings
     */
    public SimpleString[] split(final char delim) {
+      if (this.str != null) {
+         return splitWithCachedString(this, delim);
+      } else {
+         return splitWithoutCachedString(delim);
+      }
+   }
+
+   private SimpleString[] splitWithoutCachedString(final char delim) {
       List<SimpleString> all = null;
 
       byte low = (byte) (delim & 0xFF); // low byte
@@ -361,6 +371,58 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
       }
    }
 
+   private static SimpleString[] splitWithCachedString(final SimpleString simpleString, final int delim) {
+      final String str = simpleString.str;
+      final byte[] data = simpleString.data;
+      final int length = str.length();
+      List<SimpleString> all = null;
+      int index = 0;
+      while (index < length) {
+         final int delimIndex = str.indexOf(delim, index);
+         if (delimIndex == -1) {
+            //just need to add the last one
+            break;
+         } else {
+            all = addSimpleStringPart(all, data, index, delimIndex);
+         }
+         index = delimIndex + 1;
+      }
+      if (all == null) {
+         return new SimpleString[]{simpleString};
+      } else {
+         // Adding the last one
+         all = addSimpleStringPart(all, data, index, length);
+         // Converting it to arrays
+         final SimpleString[] parts = new SimpleString[all.size()];
+         return all.toArray(parts);
+      }
+   }
+
+   private static List<SimpleString> addSimpleStringPart(List<SimpleString> all,
+                                                         final byte[] data,
+                                                         final int startIndex,
+                                                         final int endIndex) {
+      final int expectedLength = endIndex - startIndex;
+      final SimpleString ss;
+      if (expectedLength == 0) {
+         ss = EMPTY;
+      } else {
+         //extract a byte[] copy from this
+         final int ssIndex = startIndex << 1;
+         final int delIndex = endIndex << 1;
+         final byte[] bytes = Arrays.copyOfRange(data, ssIndex, delIndex);
+         ss = new SimpleString(bytes);
+      }
+      // We will create the ArrayList lazily
+      if (all == null) {
+         // There will be at least 3 strings on this case (which is the actual common usecase)
+         // For that reason I'm allocating the ArrayList with 3 already
+         all = new ArrayList<>(3);
+      }
+      all.add(ss);
+      return all;
+   }
+
    /**
     * checks to see if this SimpleString contains the char parameter passed in
     *
@@ -368,6 +430,9 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
     * @return true if the char is found, false otherwise.
     */
    public boolean contains(final char c) {
+      if (this.str != null) {
+         return this.str.indexOf(c) != -1;
+      }
       final byte low = (byte) (c & 0xFF); // low byte
       final byte high = (byte) (c >> 8 & 0xFF); // high byte
 


[02/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Refactored OpenWireMessageConverter::toAMQMessage into smaller methods


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6b6dd95
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6b6dd95
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6b6dd95

Branch: refs/heads/master
Commit: c6b6dd95d1665230d667557df240d8a62a2118af
Parents: e7a1dca
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 14:37:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 562 +++++++++++--------
 1 file changed, 316 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6b6dd95/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 0948f8a..3dc4a4e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -120,7 +120,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       return null;
    }
 
-//   @Override
+   //   @Override
    public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
       final Message messageSend = (Message) message;
@@ -205,7 +205,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       final ProducerId producerId = messageSend.getProducerId();
       if (producerId != null) {
-         putMsgProducerId(producerId, marshaller, coreMessage);
+         final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+         producerIdBytes.compact();
+         coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
       }
       final ByteSequence propBytes = messageSend.getMarshalledProperties();
       if (propBytes != null) {
@@ -437,14 +439,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
    }
 
-   private static void putMsgProducerId(final ProducerId producerId,
-                                        final WireFormat marshaller,
-                                        final CoreMessage coreMessage) throws IOException {
-      final ByteSequence producerIdBytes = marshaller.marshal(producerId);
-      producerIdBytes.compact();
-      coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
-   }
-
    private static void putMsgMarshalledProperties(final ByteSequence propBytes,
                                                   final Message messageSend,
                                                   final CoreMessage coreMessage) throws IOException {
@@ -512,9 +506,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    }
 
    public MessageDispatch createMessageDispatch(MessageReference reference,
-                                                       ICoreMessage message,
-                                                       AMQConsumer consumer) throws IOException, JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
+                                                ICoreMessage message,
+                                                AMQConsumer consumer) throws IOException, JMSException {
+      ActiveMQMessage amqMessage = toAMQMessage(reference, message, marshaller, consumer);
 
       //we can use core message id for sequenceId
       amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -529,35 +523,48 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       return md;
    }
 
-   private ActiveMQMessage toAMQMessage(MessageReference reference,
+   private static ActiveMQMessage toAMQMessage(MessageReference reference,
                                                ICoreMessage coreMessage,
+                                               WireFormat marshaller,
                                                AMQConsumer consumer) throws IOException {
-      ActiveMQMessage amqMsg = null;
-      byte coreType = coreMessage.getType();
+      final ActiveMQMessage amqMsg;
+      final byte coreType = coreMessage.getType();
+      final Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+      final boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
+      final byte[] bytes;
+      final ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
+      buffer.resetReaderIndex();
+
       switch (coreType) {
          case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
             amqMsg = new ActiveMQBytesMessage();
+            bytes = toAMQMessageBytesType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
             amqMsg = new ActiveMQMapMessage();
+            bytes = toAMQMessageMapType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
             amqMsg = new ActiveMQObjectMessage();
+            bytes = toAMQMessageObjectType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
             amqMsg = new ActiveMQStreamMessage();
+            bytes = toAMQMessageStreamType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
             amqMsg = new ActiveMQTextMessage();
+            bytes = toAMQMessageTextType(buffer, isCompressed);
             break;
          case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE:
             amqMsg = new ActiveMQMessage();
+            bytes = toAMQMessageDefaultType(buffer, isCompressed);
             break;
          default:
             throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
       }
 
-      String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
+      final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
       if (type != null) {
          amqMsg.setJMSType(type);
       }
@@ -572,165 +579,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setBrokerInTime(brokerInTime);
 
-      ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
-      Boolean compressProp = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
-      boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
       amqMsg.setCompressed(isCompressed);
 
-      byte[] bytes = null;
-      if (buffer != null) {
-         buffer.resetReaderIndex();
-
-         if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
-            SimpleString text = buffer.readNullableSimpleString();
-            if (text != null) {
-               ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
-               OutputStream out = bytesOut;
-               if (isCompressed) {
-                  out = new DeflaterOutputStream(out, true);
-               }
-               try (DataOutputStream dataOut = new DataOutputStream(out)) {
-                  MarshallingSupport.writeUTF8(dataOut, text.toString());
-                  dataOut.flush();
-                  bytes = bytesOut.toByteArray();
-               }
-            }
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
-            TypedProperties mapData = new TypedProperties();
-            //it could be a null map
-            if (buffer.readableBytes() > 0) {
-               mapData.decode(buffer.byteBuf());
-               Map<String, Object> map = mapData.getMap();
-               ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
-               OutputStream os = out;
-               if (isCompressed) {
-                  os = new DeflaterOutputStream(os, true);
-               }
-               try (DataOutputStream dataOut = new DataOutputStream(os)) {
-                  MarshallingSupport.marshalPrimitiveMap(map, dataOut);
-                  dataOut.flush();
-               }
-               bytes = out.toByteArray();
-            }
-
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
-            if (buffer.readableBytes() > 0) {
-               int len = buffer.readInt();
-               bytes = new byte[len];
-               buffer.readBytes(bytes);
-               if (isCompressed) {
-                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                  try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
-                     out.write(bytes);
-                     out.flush();
-                  }
-                  bytes = bytesOut.toByteArray();
-               }
-            }
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
-            org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
-            OutputStream out = bytesOut;
-            if (isCompressed) {
-               out = new DeflaterOutputStream(bytesOut, true);
-            }
-            try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
-               boolean stop = false;
-               while (!stop && buffer.readable()) {
-                  byte primitiveType = buffer.readByte();
-                  switch (primitiveType) {
-                     case DataConstants.BOOLEAN:
-                        MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
-                        break;
-                     case DataConstants.BYTE:
-                        MarshallingSupport.marshalByte(dataOut, buffer.readByte());
-                        break;
-                     case DataConstants.BYTES:
-                        int len = buffer.readInt();
-                        byte[] bytesData = new byte[len];
-                        buffer.readBytes(bytesData);
-                        MarshallingSupport.marshalByteArray(dataOut, bytesData);
-                        break;
-                     case DataConstants.CHAR:
-                        char ch = (char) buffer.readShort();
-                        MarshallingSupport.marshalChar(dataOut, ch);
-                        break;
-                     case DataConstants.DOUBLE:
-                        double doubleVal = Double.longBitsToDouble(buffer.readLong());
-                        MarshallingSupport.marshalDouble(dataOut, doubleVal);
-                        break;
-                     case DataConstants.FLOAT:
-                        Float floatVal = Float.intBitsToFloat(buffer.readInt());
-                        MarshallingSupport.marshalFloat(dataOut, floatVal);
-                        break;
-                     case DataConstants.INT:
-                        MarshallingSupport.marshalInt(dataOut, buffer.readInt());
-                        break;
-                     case DataConstants.LONG:
-                        MarshallingSupport.marshalLong(dataOut, buffer.readLong());
-                        break;
-                     case DataConstants.SHORT:
-                        MarshallingSupport.marshalShort(dataOut, buffer.readShort());
-                        break;
-                     case DataConstants.STRING:
-                        String string = buffer.readNullableString();
-                        if (string == null) {
-                           MarshallingSupport.marshalNull(dataOut);
-                        } else {
-                           MarshallingSupport.marshalString(dataOut, string);
-                        }
-                        break;
-                     default:
-                        //now we stop
-                        stop = true;
-                        break;
-                  }
-                  dataOut.flush();
-               }
-            }
-            bytes = bytesOut.toByteArray();
-         } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
-            int n = buffer.readableBytes();
-            bytes = new byte[n];
-            buffer.readBytes(bytes);
-            if (isCompressed) {
-               int length = bytes.length;
-               Deflater deflater = new Deflater();
-               try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                  compressed.write(new byte[4]);
-                  deflater.setInput(bytes);
-                  deflater.finish();
-                  byte[] bytesBuf = new byte[1024];
-                  while (!deflater.finished()) {
-                     int count = deflater.deflate(bytesBuf);
-                     compressed.write(bytesBuf, 0, count);
-                  }
-                  compressed.flush();
-                  ByteSequence byteSeq = compressed.toByteSequence();
-                  ByteSequenceData.writeIntBig(byteSeq, length);
-                  bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
-               } finally {
-                  deflater.end();
-               }
-            }
-         } else {
-            int n = buffer.readableBytes();
-            bytes = new byte[n];
-            buffer.readBytes(bytes);
-            if (isCompressed) {
-               try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                    DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
-                  out.write(bytes);
-                  out.flush();
-                  bytes = bytesOut.toByteArray();
-               }
-            }
-         }
-
-         buffer.resetReaderIndex();// this is important for topics as the buffer
-         // may be read multiple times
-      }
-
       //we need check null because messages may come from other clients
       //and those amq specific attribute may not be set.
       Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
@@ -740,24 +590,14 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setArrival(arrival);
 
-      String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
-      if (brokerPath != null && brokerPath.isEmpty()) {
-         String[] brokers = brokerPath.split(",");
-         BrokerId[] bids = new BrokerId[brokers.length];
-         for (int i = 0; i < bids.length; i++) {
-            bids[i] = new BrokerId(brokers[i]);
-         }
-         amqMsg.setBrokerPath(bids);
+      final String brokerPath = (String) coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
+      if (brokerPath != null && !brokerPath.isEmpty()) {
+         setAMQMsgBrokerPath(amqMsg, brokerPath);
       }
 
-      String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
-      if (clusterPath != null && clusterPath.isEmpty()) {
-         String[] cluster = clusterPath.split(",");
-         BrokerId[] bids = new BrokerId[cluster.length];
-         for (int i = 0; i < bids.length; i++) {
-            bids[i] = new BrokerId(cluster[i]);
-         }
-         amqMsg.setCluster(bids);
+      final String clusterPath = (String) coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
+      if (clusterPath != null && !clusterPath.isEmpty()) {
+         setAMQMsgClusterPath(amqMsg, clusterPath);
       }
 
       Integer commandId = (Integer) coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
@@ -766,21 +606,19 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setCommandId(commandId);
 
-      SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
+      final SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
       if (corrId != null) {
          amqMsg.setCorrelationId(corrId.toString());
       }
 
-      byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
+      final byte[] dsBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
       if (dsBytes != null) {
-         ByteSequence seq = new ByteSequence(dsBytes);
-         DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
-         amqMsg.setDataStructure(ds);
+         setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
       }
       final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
       amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
 
-      Object value = coreMessage.getGroupID();
+      final Object value = coreMessage.getGroupID();
       if (value != null) {
          String groupId = value.toString();
          amqMsg.setGroupID(groupId);
@@ -792,8 +630,8 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setGroupSequence(groupSequence);
 
-      MessageId mid = null;
-      byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
+      final MessageId mid;
+      final byte[] midBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MESSAGE_ID);
       if (midBytes != null) {
          ByteSequence midSeq = new ByteSequence(midBytes);
          mid = (MessageId) marshaller.unmarshal(midSeq);
@@ -803,97 +641,329 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       amqMsg.setMessageId(mid);
 
-      byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
+      final byte[] origDestBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
       if (origDestBytes != null) {
-         ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
-         amqMsg.setOriginalDestination(origDest);
+         setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
       }
 
-      byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
+      final byte[] origTxIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
       if (origTxIdBytes != null) {
-         TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
-         amqMsg.setOriginalTransactionId(origTxId);
+         setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
       }
 
-      byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
+      final byte[] producerIdBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
       if (producerIdBytes != null) {
          ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
          amqMsg.setProducerId(producerId);
       }
 
-      byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
+      final byte[] marshalledBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
       if (marshalledBytes != null) {
          amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
       }
 
       amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
 
-      byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
+      final byte[] replyToBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
       if (replyToBytes != null) {
-         ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
-         amqMsg.setReplyTo(replyTo);
+         setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
       }
 
-      String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
+      final String userId = (String) coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
       if (userId != null) {
          amqMsg.setUserID(userId);
       }
 
-      Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
+      final Boolean isDroppable = (Boolean) coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
       if (isDroppable != null) {
          amqMsg.setDroppable(isDroppable);
       }
 
-      SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+      final SimpleString dlqCause = (SimpleString) coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
       if (dlqCause != null) {
-         try {
-            amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
-         } catch (JMSException e) {
-            throw new IOException("failure to set dlq property " + dlqCause, e);
-         }
+         setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
       }
 
-      SimpleString lastValueProperty = coreMessage.getLastValueProperty();
+      final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
       if (lastValueProperty != null) {
-         try {
-            amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
-         } catch (JMSException e) {
-            throw new IOException("failure to set lvq property " + dlqCause, e);
-         }
+         setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
       }
 
-      Set<SimpleString> props = coreMessage.getPropertyNames();
+      final Set<SimpleString> props = coreMessage.getPropertyNames();
       if (props != null) {
-         for (SimpleString s : props) {
-            String keyStr = s.toString();
-            if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
-                    !consumer.hasNotificationDestination()) {
-               continue;
+         setAMQMsgObjectProperties(amqMsg, coreMessage, props, consumer);
+      }
+
+      if (bytes != null) {
+         ByteSequence content = new ByteSequence(bytes);
+         amqMsg.setContent(content);
+      }
+      return amqMsg;
+   }
+
+   private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
+                                              final boolean isCompressed) throws IOException {
+      byte[] bytes = null;
+      SimpleString text = buffer.readNullableSimpleString();
+      if (text != null) {
+         ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
+         OutputStream out = bytesOut;
+         if (isCompressed) {
+            out = new DeflaterOutputStream(out, true);
+         }
+         try (DataOutputStream dataOut = new DataOutputStream(out)) {
+            MarshallingSupport.writeUTF8(dataOut, text.toString());
+            dataOut.flush();
+            bytes = bytesOut.toByteArray();
+         }
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageMapType(final ActiveMQBuffer buffer,
+                                             final boolean isCompressed) throws IOException {
+      byte[] bytes = null;
+      //it could be a null map
+      if (buffer.readableBytes() > 0) {
+         TypedProperties mapData = new TypedProperties();
+         mapData.decode(buffer.byteBuf());
+         Map<String, Object> map = mapData.getMap();
+         ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+         OutputStream os = out;
+         if (isCompressed) {
+            os = new DeflaterOutputStream(os, true);
+         }
+         try (DataOutputStream dataOut = new DataOutputStream(os)) {
+            MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+            dataOut.flush();
+         }
+         bytes = out.toByteArray();
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageObjectType(final ActiveMQBuffer buffer,
+                                                final boolean isCompressed) throws IOException {
+      byte[] bytes = null;
+      if (buffer.readableBytes() > 0) {
+         int len = buffer.readInt();
+         bytes = new byte[len];
+         buffer.readBytes(bytes);
+         if (isCompressed) {
+            ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+            try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+               out.write(bytes);
+               out.flush();
             }
-            Object prop = coreMessage.getObjectProperty(s);
-            try {
-               if (prop instanceof SimpleString) {
-                  amqMsg.setObjectProperty(s.toString(), prop.toString());
-               } else {
-                  if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
-                     Long l = (Long) prop;
-                     amqMsg.setObjectProperty(s.toString(), l.intValue());
+            bytes = bytesOut.toByteArray();
+         }
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageStreamType(final ActiveMQBuffer buffer,
+                                                final boolean isCompressed) throws IOException {
+      byte[] bytes;
+      org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+      OutputStream out = bytesOut;
+      if (isCompressed) {
+         out = new DeflaterOutputStream(bytesOut, true);
+      }
+      try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+         boolean stop = false;
+         while (!stop && buffer.readable()) {
+            byte primitiveType = buffer.readByte();
+            switch (primitiveType) {
+               case DataConstants.BOOLEAN:
+                  MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+                  break;
+               case DataConstants.BYTE:
+                  MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+                  break;
+               case DataConstants.BYTES:
+                  int len = buffer.readInt();
+                  byte[] bytesData = new byte[len];
+                  buffer.readBytes(bytesData);
+                  MarshallingSupport.marshalByteArray(dataOut, bytesData);
+                  break;
+               case DataConstants.CHAR:
+                  char ch = (char) buffer.readShort();
+                  MarshallingSupport.marshalChar(dataOut, ch);
+                  break;
+               case DataConstants.DOUBLE:
+                  double doubleVal = Double.longBitsToDouble(buffer.readLong());
+                  MarshallingSupport.marshalDouble(dataOut, doubleVal);
+                  break;
+               case DataConstants.FLOAT:
+                  Float floatVal = Float.intBitsToFloat(buffer.readInt());
+                  MarshallingSupport.marshalFloat(dataOut, floatVal);
+                  break;
+               case DataConstants.INT:
+                  MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+                  break;
+               case DataConstants.LONG:
+                  MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+                  break;
+               case DataConstants.SHORT:
+                  MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+                  break;
+               case DataConstants.STRING:
+                  String string = buffer.readNullableString();
+                  if (string == null) {
+                     MarshallingSupport.marshalNull(dataOut);
                   } else {
-                     amqMsg.setObjectProperty(s.toString(), prop);
+                     MarshallingSupport.marshalString(dataOut, string);
                   }
-               }
-            } catch (JMSException e) {
-               throw new IOException("exception setting property " + s + " : " + prop, e);
+                  break;
+               default:
+                  //now we stop
+                  stop = true;
+                  break;
             }
+            dataOut.flush();
          }
       }
+      bytes = bytesOut.toByteArray();
+      return bytes;
+   }
 
-      amqMsg.setCompressed(isCompressed);
-      if (bytes != null) {
-         ByteSequence content = new ByteSequence(bytes);
-         amqMsg.setContent(content);
+   private static byte[] toAMQMessageBytesType(final ActiveMQBuffer buffer,
+                                               final boolean isCompressed) throws IOException {
+      int n = buffer.readableBytes();
+      byte[] bytes = new byte[n];
+      buffer.readBytes(bytes);
+      if (isCompressed) {
+         bytes = toAMQMessageCompressedBytesType(bytes);
+      }
+      return bytes;
+   }
+
+   private static byte[] toAMQMessageCompressedBytesType(final byte[] bytes) throws IOException {
+      int length = bytes.length;
+      Deflater deflater = new Deflater();
+      try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+         compressed.write(new byte[4]);
+         deflater.setInput(bytes);
+         deflater.finish();
+         byte[] bytesBuf = new byte[1024];
+         while (!deflater.finished()) {
+            int count = deflater.deflate(bytesBuf);
+            compressed.write(bytesBuf, 0, count);
+         }
+         compressed.flush();
+         ByteSequence byteSeq = compressed.toByteSequence();
+         ByteSequenceData.writeIntBig(byteSeq, length);
+         return Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+      } finally {
+         deflater.end();
       }
-      return amqMsg;
    }
 
+   private static byte[] toAMQMessageDefaultType(final ActiveMQBuffer buffer,
+                                                 final boolean isCompressed) throws IOException {
+      int n = buffer.readableBytes();
+      byte[] bytes = new byte[n];
+      buffer.readBytes(bytes);
+      if (isCompressed) {
+         try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+            out.write(bytes);
+            out.flush();
+            bytes = bytesOut.toByteArray();
+         }
+      }
+      return bytes;
+   }
+
+   private static void setAMQMsgBrokerPath(final ActiveMQMessage amqMsg, final String brokerPath) {
+      String[] brokers = brokerPath.split(",");
+      BrokerId[] bids = new BrokerId[brokers.length];
+      for (int i = 0; i < bids.length; i++) {
+         bids[i] = new BrokerId(brokers[i]);
+      }
+      amqMsg.setBrokerPath(bids);
+   }
+
+   private static void setAMQMsgClusterPath(final ActiveMQMessage amqMsg, final String clusterPath) {
+      String[] cluster = clusterPath.split(",");
+      BrokerId[] bids = new BrokerId[cluster.length];
+      for (int i = 0; i < bids.length; i++) {
+         bids[i] = new BrokerId(cluster[i]);
+      }
+      amqMsg.setCluster(bids);
+   }
+
+   private static void setAMQMsgDataStructure(final ActiveMQMessage amqMsg,
+                                              final WireFormat marshaller,
+                                              final byte[] dsBytes) throws IOException {
+      ByteSequence seq = new ByteSequence(dsBytes);
+      DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
+      amqMsg.setDataStructure(ds);
+   }
+
+   private static void setAMQMsgOriginalDestination(final ActiveMQMessage amqMsg,
+                                                    final WireFormat marshaller,
+                                                    final byte[] origDestBytes) throws IOException {
+      ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestBytes));
+      amqMsg.setOriginalDestination(origDest);
+   }
+
+   private static void setAMQMsgOriginalTransactionId(final ActiveMQMessage amqMsg,
+                                                      final WireFormat marshaller,
+                                                      final byte[] origTxIdBytes) throws IOException {
+      TransactionId origTxId = (TransactionId) marshaller.unmarshal(new ByteSequence(origTxIdBytes));
+      amqMsg.setOriginalTransactionId(origTxId);
+   }
+
+   private static void setAMQMsgReplyTo(final ActiveMQMessage amqMsg,
+                                        final WireFormat marshaller,
+                                        final byte[] replyToBytes) throws IOException {
+      ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
+      amqMsg.setReplyTo(replyTo);
+   }
+
+   private static void setAMQMsgDlqDeliveryFailureCause(final ActiveMQMessage amqMsg,
+                                                        final SimpleString dlqCause) throws IOException {
+      try {
+         amqMsg.setStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, dlqCause.toString());
+      } catch (JMSException e) {
+         throw new IOException("failure to set dlq property " + dlqCause, e);
+      }
+   }
+
+   private static void setAMQMsgHdrLastValueName(final ActiveMQMessage amqMsg,
+                                                 final SimpleString lastValueProperty) throws IOException {
+      try {
+         amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
+      } catch (JMSException e) {
+         throw new IOException("failure to set lvq property " + lastValueProperty, e);
+      }
+   }
+
+   private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
+                                                 final ICoreMessage coreMessage,
+                                                 final Set<SimpleString> props,
+                                                 final AMQConsumer consumer) throws IOException {
+      for (SimpleString s : props) {
+         final String keyStr = s.toString();
+         if (!consumer.hasNotificationDestination() && (keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_"))) {
+            continue;
+         }
+         final Object prop = coreMessage.getObjectProperty(s);
+         try {
+            if (prop instanceof SimpleString) {
+               amqMsg.setObjectProperty(keyStr, prop.toString());
+            } else {
+               if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
+                  Long l = (Long) prop;
+                  amqMsg.setObjectProperty(keyStr, l.intValue());
+               } else {
+                  amqMsg.setObjectProperty(keyStr, prop);
+               }
+            }
+         } catch (JMSException e) {
+            throw new IOException("exception setting property " + s + " : " + prop, e);
+         }
+      }
+   }
 }


[09/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Cached Notification Destination check on AMQConsumer to avoid expensive ActiveMQDestination::toString


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9650c80b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9650c80b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9650c80b

Branch: refs/heads/master
Commit: 9650c80ba701acbf727f379bd155551cdd663b47
Parents: 64724c3
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed Jan 17 00:37:24 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../core/protocol/openwire/OpenWireMessageConverter.java  | 10 ++++------
 .../artemis/core/protocol/openwire/amq/AMQConsumer.java   |  9 ++++++++-
 2 files changed, 12 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9650c80b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 457593d..0948f8a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -97,8 +97,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    private static final SimpleString AMQ_MSG_DROPPABLE =  new SimpleString(AMQ_PREFIX + "DROPPABLE");
    private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
 
-   private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
-
    private final WireFormat marshaller;
 
    public OpenWireMessageConverter(WireFormat marshaller) {
@@ -516,7 +514,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
    public MessageDispatch createMessageDispatch(MessageReference reference,
                                                        ICoreMessage message,
                                                        AMQConsumer consumer) throws IOException, JMSException {
-      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer.getOpenwireDestination());
+      ActiveMQMessage amqMessage = toAMQMessage(reference, message, consumer);
 
       //we can use core message id for sequenceId
       amqMessage.getMessageId().setBrokerSequenceId(message.getMessageID());
@@ -533,7 +531,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
    private ActiveMQMessage toAMQMessage(MessageReference reference,
                                                ICoreMessage coreMessage,
-                                               ActiveMQDestination actualDestination) throws IOException {
+                                               AMQConsumer consumer) throws IOException {
       ActiveMQMessage amqMsg = null;
       byte coreType = coreMessage.getType();
       switch (coreType) {
@@ -779,7 +777,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
          DataStructure ds = (DataStructure) marshaller.unmarshal(seq);
          amqMsg.setDataStructure(ds);
       }
-
+      final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
       amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
 
       Object value = coreMessage.getGroupID();
@@ -869,7 +867,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
          for (SimpleString s : props) {
             String keyStr = s.toString();
             if ((keyStr.startsWith("_AMQ") || keyStr.startsWith("__HDR_")) &&
-                    !(actualDestination.toString().contains(AMQ_NOTIFICATIONS_DESTINATION))) {
+                    !consumer.hasNotificationDestination()) {
                continue;
             }
             Object prop = coreMessage.getObjectProperty(s);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9650c80b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 77051cc..bfa51eb 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -53,8 +53,10 @@ import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.RemoveInfo;
 
 public class AMQConsumer {
+   private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
    private AMQSession session;
-   private org.apache.activemq.command.ActiveMQDestination openwireDestination;
+   private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
+   private final boolean hasNotificationDestination;
    private ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
    private ServerConsumer serverConsumer;
@@ -74,6 +76,7 @@ public class AMQConsumer {
                       boolean internalAddress) {
       this.session = amqSession;
       this.openwireDestination = d;
+      this.hasNotificationDestination = d.toString().contains(AMQ_NOTIFICATIONS_DESTINATION);
       this.info = info;
       this.scheduledPool = scheduledPool;
       this.prefetchSize = info.getPrefetchSize();
@@ -331,6 +334,10 @@ public class AMQConsumer {
       serverConsumer.close(false);
    }
 
+   public boolean hasNotificationDestination() {
+      return hasNotificationDestination;
+   }
+
    public org.apache.activemq.command.ActiveMQDestination getOpenwireDestination() {
       return openwireDestination;
    }


[05/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Refactored OpenWireMessageConverter::inbound into smaller methods


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/54d01618
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/54d01618
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/54d01618

Branch: refs/heads/master
Commit: 54d0161850bc4543839d4f36b46f3bd84f5cd8e1
Parents: 2db4eaf
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 17:01:54 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 474 +++++++++++--------
 1 file changed, 280 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/54d01618/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 83ff6d6..54f3c99 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -123,10 +123,10 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 //   @Override
    public org.apache.activemq.artemis.api.core.Message inbound(Object message, CoreMessageObjectPools coreMessageObjectPools) throws Exception {
 
-      Message messageSend = (Message) message;
-      CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
+      final Message messageSend = (Message) message;
+      final CoreMessage coreMessage = new CoreMessage(-1, messageSend.getSize(), coreMessageObjectPools);
 
-      String type = messageSend.getType();
+      final String type = messageSend.getType();
       if (type != null) {
          coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
       }
@@ -135,264 +135,350 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       coreMessage.setPriority(messageSend.getPriority());
       coreMessage.setTimestamp(messageSend.getTimestamp());
 
-      byte coreType = toCoreType(messageSend.getDataStructureType());
+      final byte coreType = toCoreType(messageSend.getDataStructureType());
       coreMessage.setType(coreType);
 
-      ActiveMQBuffer body = coreMessage.getBodyBuffer();
+      final ActiveMQBuffer body = coreMessage.getBodyBuffer();
 
-      ByteSequence contents = messageSend.getContent();
+      final ByteSequence contents = messageSend.getContent();
       if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
          body.writeNullableString(null);
       } else if (contents != null) {
-         boolean messageCompressed = messageSend.isCompressed();
+         final boolean messageCompressed = messageSend.isCompressed();
          if (messageCompressed) {
             coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
          }
 
          switch (coreType) {
             case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE:
-               InputStream tis = new ByteArrayInputStream(contents);
-               if (messageCompressed) {
-                  tis = new InflaterInputStream(tis);
-               }
-               DataInputStream tdataIn = new DataInputStream(tis);
-               String text = MarshallingSupport.readUTF8(tdataIn);
-               tdataIn.close();
-               body.writeNullableSimpleString(new SimpleString(text));
+               writeTextType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.MAP_TYPE:
-               InputStream mis = new ByteArrayInputStream(contents);
-               if (messageCompressed) {
-                  mis = new InflaterInputStream(mis);
-               }
-               DataInputStream mdataIn = new DataInputStream(mis);
-               Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
-               mdataIn.close();
-               TypedProperties props = new TypedProperties();
-               loadMapIntoProperties(props, map);
-               props.encode(body.byteBuf());
+               writeMapType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE:
-               if (messageCompressed) {
-                  try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
-                       org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                     byte[] buf = new byte[1024];
-                     int n = ois.read(buf);
-                     while (n != -1) {
-                        decompressed.write(buf, 0, n);
-                        n = ois.read();
-                     }
-                     //read done
-                     contents = decompressed.toByteSequence();
-                  }
-               }
-               body.writeInt(contents.length);
-               body.writeBytes(contents.data, contents.offset, contents.length);
+               writeObjectType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE:
-               InputStream sis = new ByteArrayInputStream(contents);
-               if (messageCompressed) {
-                  sis = new InflaterInputStream(sis);
-               }
-               DataInputStream sdis = new DataInputStream(sis);
-               int stype = sdis.read();
-               while (stype != -1) {
-                  switch (stype) {
-                     case MarshallingSupport.BOOLEAN_TYPE:
-                        body.writeByte(DataConstants.BOOLEAN);
-                        body.writeBoolean(sdis.readBoolean());
-                        break;
-                     case MarshallingSupport.BYTE_TYPE:
-                        body.writeByte(DataConstants.BYTE);
-                        body.writeByte(sdis.readByte());
-                        break;
-                     case MarshallingSupport.BYTE_ARRAY_TYPE:
-                        body.writeByte(DataConstants.BYTES);
-                        int slen = sdis.readInt();
-                        byte[] sbytes = new byte[slen];
-                        sdis.read(sbytes);
-                        body.writeInt(slen);
-                        body.writeBytes(sbytes);
-                        break;
-                     case MarshallingSupport.CHAR_TYPE:
-                        body.writeByte(DataConstants.CHAR);
-                        char schar = sdis.readChar();
-                        body.writeShort((short) schar);
-                        break;
-                     case MarshallingSupport.DOUBLE_TYPE:
-                        body.writeByte(DataConstants.DOUBLE);
-                        double sdouble = sdis.readDouble();
-                        body.writeLong(Double.doubleToLongBits(sdouble));
-                        break;
-                     case MarshallingSupport.FLOAT_TYPE:
-                        body.writeByte(DataConstants.FLOAT);
-                        float sfloat = sdis.readFloat();
-                        body.writeInt(Float.floatToIntBits(sfloat));
-                        break;
-                     case MarshallingSupport.INTEGER_TYPE:
-                        body.writeByte(DataConstants.INT);
-                        body.writeInt(sdis.readInt());
-                        break;
-                     case MarshallingSupport.LONG_TYPE:
-                        body.writeByte(DataConstants.LONG);
-                        body.writeLong(sdis.readLong());
-                        break;
-                     case MarshallingSupport.SHORT_TYPE:
-                        body.writeByte(DataConstants.SHORT);
-                        body.writeShort(sdis.readShort());
-                        break;
-                     case MarshallingSupport.STRING_TYPE:
-                        body.writeByte(DataConstants.STRING);
-                        String sstring = sdis.readUTF();
-                        body.writeNullableString(sstring);
-                        break;
-                     case MarshallingSupport.BIG_STRING_TYPE:
-                        body.writeByte(DataConstants.STRING);
-                        String sbigString = MarshallingSupport.readUTF8(sdis);
-                        body.writeNullableString(sbigString);
-                        break;
-                     case MarshallingSupport.NULL:
-                        body.writeByte(DataConstants.STRING);
-                        body.writeNullableString(null);
-                        break;
-                     default:
-                        //something we don't know, ignore
-                        break;
-                  }
-                  stype = sdis.read();
-               }
-               sdis.close();
+               writeStreamType(contents, messageCompressed, body);
                break;
             case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE:
-               if (messageCompressed) {
-                  Inflater inflater = new Inflater();
-                  try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                     int length = ByteSequenceData.readIntBig(contents);
-                     contents.offset = 0;
-                     byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
-
-                     inflater.setInput(data);
-                     byte[] buffer = new byte[length];
-                     int count = inflater.inflate(buffer);
-                     decompressed.write(buffer, 0, count);
-                     contents = decompressed.toByteSequence();
-                  } catch (Exception e) {
-                     throw new IOException(e);
-                  } finally {
-                     inflater.end();
-                  }
-               }
-               body.writeBytes(contents.data, contents.offset, contents.length);
+               writeBytesType(contents, messageCompressed, body);
                break;
             default:
-               if (messageCompressed) {
-                  try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
-                       OutputStream os = new InflaterOutputStream(decompressed)) {
-                     os.write(contents.data, contents.offset, contents.getLength());
-                     contents = decompressed.toByteSequence();
-                  } catch (Exception e) {
-                     throw new IOException(e);
-                  }
-               }
-               body.writeBytes(contents.data, contents.offset, contents.length);
+               writeDefaultType(contents, messageCompressed, body);
                break;
          }
       }
       //amq specific
       coreMessage.putLongProperty(AMQ_MSG_ARRIVAL, messageSend.getArrival());
       coreMessage.putLongProperty(AMQ_MSG_BROKER_IN_TIME, messageSend.getBrokerInTime());
-      BrokerId[] brokers = messageSend.getBrokerPath();
+      final BrokerId[] brokers = messageSend.getBrokerPath();
       if (brokers != null) {
-         StringBuilder builder = new StringBuilder();
-         for (int i = 0; i < brokers.length; i++) {
-            builder.append(brokers[i].getValue());
-            if (i != (brokers.length - 1)) {
-               builder.append(","); //is this separator safe?
-            }
-         }
-         coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+         putMsgBrokerPath(brokers, coreMessage);
       }
-      BrokerId[] cluster = messageSend.getCluster();
+      final BrokerId[] cluster = messageSend.getCluster();
       if (cluster != null) {
-         StringBuilder builder = new StringBuilder();
-         for (int i = 0; i < cluster.length; i++) {
-            builder.append(cluster[i].getValue());
-            if (i != (cluster.length - 1)) {
-               builder.append(","); //is this separator safe?
-            }
-         }
-         coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+         putMsgCluster(cluster, coreMessage);
       }
 
       coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
-      String corrId = messageSend.getCorrelationId();
+      final String corrId = messageSend.getCorrelationId();
       if (corrId != null) {
          coreMessage.putStringProperty("JMSCorrelationID", corrId);
       }
-      DataStructure ds = messageSend.getDataStructure();
+      final DataStructure ds = messageSend.getDataStructure();
       if (ds != null) {
-         ByteSequence dsBytes = marshaller.marshal(ds);
-         dsBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+         putMsgDataStructure(ds, marshaller, coreMessage);
       }
-      String groupId = messageSend.getGroupID();
+      final String groupId = messageSend.getGroupID();
       if (groupId != null) {
          coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
       }
       coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
 
-      MessageId messageId = messageSend.getMessageId();
+      final MessageId messageId = messageSend.getMessageId();
 
-      ByteSequence midBytes = marshaller.marshal(messageId);
+      final ByteSequence midBytes = marshaller.marshal(messageId);
       midBytes.compact();
       coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
 
-      ProducerId producerId = messageSend.getProducerId();
+      final ProducerId producerId = messageSend.getProducerId();
       if (producerId != null) {
-         ByteSequence producerIdBytes = marshaller.marshal(producerId);
-         producerIdBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+         putMsgProducerId(producerId, marshaller, coreMessage);
       }
-      ByteSequence propBytes = messageSend.getMarshalledProperties();
+      final ByteSequence propBytes = messageSend.getMarshalledProperties();
       if (propBytes != null) {
-         propBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
-         //unmarshall properties to core so selector will work
-         Map<String, Object> props = messageSend.getProperties();
-         //Map<String, Object> props = MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(propBytes)));
-         for (Entry<String, Object> ent : props.entrySet()) {
-            Object value = ent.getValue();
-            try {
-               coreMessage.putObjectProperty(ent.getKey(), value);
-            } catch (ActiveMQPropertyConversionException e) {
-               coreMessage.putStringProperty(ent.getKey(), value.toString());
-            }
-         }
+         putMsgMarshalledProperties(propBytes, messageSend, coreMessage);
       }
 
-      ActiveMQDestination replyTo = messageSend.getReplyTo();
+      final ActiveMQDestination replyTo = messageSend.getReplyTo();
       if (replyTo != null) {
-         ByteSequence replyToBytes = marshaller.marshal(replyTo);
-         replyToBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+         putMsgReplyTo(replyTo, marshaller, coreMessage);
       }
 
-      String userId = messageSend.getUserID();
+      final String userId = messageSend.getUserID();
       if (userId != null) {
          coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
       }
       coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
 
-      ActiveMQDestination origDest = messageSend.getOriginalDestination();
+      final ActiveMQDestination origDest = messageSend.getOriginalDestination();
       if (origDest != null) {
-         ByteSequence origDestBytes = marshaller.marshal(origDest);
-         origDestBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+         putMsgOriginalDestination(origDest, marshaller, coreMessage);
       }
 
       return coreMessage;
    }
 
+   private static void writeTextType(final ByteSequence contents,
+                                     final boolean messageCompressed,
+                                     final ActiveMQBuffer body) throws IOException {
+      InputStream tis = new ByteArrayInputStream(contents);
+      if (messageCompressed) {
+         tis = new InflaterInputStream(tis);
+      }
+      DataInputStream tdataIn = new DataInputStream(tis);
+      String text = MarshallingSupport.readUTF8(tdataIn);
+      tdataIn.close();
+      body.writeNullableSimpleString(new SimpleString(text));
+   }
+
+   private static void writeMapType(final ByteSequence contents,
+                                    final boolean messageCompressed,
+                                    final ActiveMQBuffer body) throws IOException {
+      InputStream mis = new ByteArrayInputStream(contents);
+      if (messageCompressed) {
+         mis = new InflaterInputStream(mis);
+      }
+      DataInputStream mdataIn = new DataInputStream(mis);
+      Map<String, Object> map = MarshallingSupport.unmarshalPrimitiveMap(mdataIn);
+      mdataIn.close();
+      TypedProperties props = new TypedProperties();
+      loadMapIntoProperties(props, map);
+      props.encode(body.byteBuf());
+   }
+
+   private static void writeObjectType(ByteSequence contents,
+                                       final boolean messageCompressed,
+                                       final ActiveMQBuffer body) throws IOException {
+      if (messageCompressed) {
+         contents = writeCompressedObjectType(contents);
+      }
+      body.writeInt(contents.length);
+      body.writeBytes(contents.data, contents.offset, contents.length);
+   }
+
+   private static ByteSequence writeCompressedObjectType(final ByteSequence contents) throws IOException {
+      try (InputStream ois = new InflaterInputStream(new ByteArrayInputStream(contents));
+           org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+         byte[] buf = new byte[1024];
+         int n = ois.read(buf);
+         while (n != -1) {
+            decompressed.write(buf, 0, n);
+            n = ois.read();
+         }
+         //read done
+         return decompressed.toByteSequence();
+      }
+   }
+
+   private static void writeStreamType(final ByteSequence contents,
+                                       final boolean messageCompressed,
+                                       final ActiveMQBuffer body) throws IOException {
+      InputStream sis = new ByteArrayInputStream(contents);
+      if (messageCompressed) {
+         sis = new InflaterInputStream(sis);
+      }
+      DataInputStream sdis = new DataInputStream(sis);
+      int stype = sdis.read();
+      while (stype != -1) {
+         switch (stype) {
+            case MarshallingSupport.BOOLEAN_TYPE:
+               body.writeByte(DataConstants.BOOLEAN);
+               body.writeBoolean(sdis.readBoolean());
+               break;
+            case MarshallingSupport.BYTE_TYPE:
+               body.writeByte(DataConstants.BYTE);
+               body.writeByte(sdis.readByte());
+               break;
+            case MarshallingSupport.BYTE_ARRAY_TYPE:
+               body.writeByte(DataConstants.BYTES);
+               int slen = sdis.readInt();
+               byte[] sbytes = new byte[slen];
+               sdis.read(sbytes);
+               body.writeInt(slen);
+               body.writeBytes(sbytes);
+               break;
+            case MarshallingSupport.CHAR_TYPE:
+               body.writeByte(DataConstants.CHAR);
+               char schar = sdis.readChar();
+               body.writeShort((short) schar);
+               break;
+            case MarshallingSupport.DOUBLE_TYPE:
+               body.writeByte(DataConstants.DOUBLE);
+               double sdouble = sdis.readDouble();
+               body.writeLong(Double.doubleToLongBits(sdouble));
+               break;
+            case MarshallingSupport.FLOAT_TYPE:
+               body.writeByte(DataConstants.FLOAT);
+               float sfloat = sdis.readFloat();
+               body.writeInt(Float.floatToIntBits(sfloat));
+               break;
+            case MarshallingSupport.INTEGER_TYPE:
+               body.writeByte(DataConstants.INT);
+               body.writeInt(sdis.readInt());
+               break;
+            case MarshallingSupport.LONG_TYPE:
+               body.writeByte(DataConstants.LONG);
+               body.writeLong(sdis.readLong());
+               break;
+            case MarshallingSupport.SHORT_TYPE:
+               body.writeByte(DataConstants.SHORT);
+               body.writeShort(sdis.readShort());
+               break;
+            case MarshallingSupport.STRING_TYPE:
+               body.writeByte(DataConstants.STRING);
+               String sstring = sdis.readUTF();
+               body.writeNullableString(sstring);
+               break;
+            case MarshallingSupport.BIG_STRING_TYPE:
+               body.writeByte(DataConstants.STRING);
+               String sbigString = MarshallingSupport.readUTF8(sdis);
+               body.writeNullableString(sbigString);
+               break;
+            case MarshallingSupport.NULL:
+               body.writeByte(DataConstants.STRING);
+               body.writeNullableString(null);
+               break;
+            default:
+               //something we don't know, ignore
+               break;
+         }
+         stype = sdis.read();
+      }
+      sdis.close();
+   }
+
+   private static void writeBytesType(ByteSequence contents,
+                                      final boolean messageCompressed,
+                                      final ActiveMQBuffer body) throws IOException {
+      if (messageCompressed) {
+         contents = writeCompressedBytesType(contents);
+      }
+      body.writeBytes(contents.data, contents.offset, contents.length);
+   }
+
+   private static ByteSequence writeCompressedBytesType(final ByteSequence contents) throws IOException {
+      Inflater inflater = new Inflater();
+      try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+         int length = ByteSequenceData.readIntBig(contents);
+         contents.offset = 0;
+         byte[] data = Arrays.copyOfRange(contents.getData(), 4, contents.getLength());
+
+         inflater.setInput(data);
+         byte[] buffer = new byte[length];
+         int count = inflater.inflate(buffer);
+         decompressed.write(buffer, 0, count);
+         return decompressed.toByteSequence();
+      } catch (Exception e) {
+         throw new IOException(e);
+      } finally {
+         inflater.end();
+      }
+   }
+
+   private static void writeDefaultType(ByteSequence contents,
+                                        final boolean messageCompressed,
+                                        final ActiveMQBuffer body) throws IOException {
+      if (messageCompressed) {
+         contents = writeCompressedDefaultType(contents);
+      }
+      body.writeBytes(contents.data, contents.offset, contents.length);
+   }
+
+   private static ByteSequence writeCompressedDefaultType(final ByteSequence contents) throws IOException {
+      try (org.apache.activemq.util.ByteArrayOutputStream decompressed = new org.apache.activemq.util.ByteArrayOutputStream();
+           OutputStream os = new InflaterOutputStream(decompressed)) {
+         os.write(contents.data, contents.offset, contents.getLength());
+         return decompressed.toByteSequence();
+      } catch (Exception e) {
+         throw new IOException(e);
+      }
+   }
+
+   private static void putMsgBrokerPath(final BrokerId[] brokers, final CoreMessage coreMessage) {
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0, size = brokers.length; i < size; i++) {
+         builder.append(brokers[i].getValue());
+         if (i != (size - 1)) {
+            builder.append(','); //is this separator safe?
+         }
+      }
+      coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+   }
+
+   private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
+      final StringBuilder builder = new StringBuilder();
+      for (int i = 0, size = cluster.length; i < size; i++) {
+         builder.append(cluster[i].getValue());
+         if (i != (size - 1)) {
+            builder.append(','); //is this separator safe?
+         }
+      }
+      coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+   }
+
+   private static void putMsgDataStructure(final DataStructure ds,
+                                           final WireFormat marshaller,
+                                           final CoreMessage coreMessage) throws IOException {
+      final ByteSequence dsBytes = marshaller.marshal(ds);
+      dsBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
+   }
+
+   private static void putMsgProducerId(final ProducerId producerId,
+                                        final WireFormat marshaller,
+                                        final CoreMessage coreMessage) throws IOException {
+      final ByteSequence producerIdBytes = marshaller.marshal(producerId);
+      producerIdBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_PRODUCER_ID, producerIdBytes.data);
+   }
+
+   private static void putMsgMarshalledProperties(final ByteSequence propBytes,
+                                                  final Message messageSend,
+                                                  final CoreMessage coreMessage) throws IOException {
+      propBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_MARSHALL_PROP, propBytes.data);
+      //unmarshall properties to core so selector will work
+      final Map<String, Object> props = messageSend.getProperties();
+      if (!props.isEmpty()) {
+         props.forEach((key, value) -> {
+            try {
+               coreMessage.putObjectProperty(key, value);
+            } catch (ActiveMQPropertyConversionException e) {
+               coreMessage.putStringProperty(key, value.toString());
+            }
+         });
+      }
+   }
+
+   private static void putMsgReplyTo(final ActiveMQDestination replyTo,
+                                     final WireFormat marshaller,
+                                     final CoreMessage coreMessage) throws IOException {
+      final ByteSequence replyToBytes = marshaller.marshal(replyTo);
+      replyToBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
+   }
+
+   private static void putMsgOriginalDestination(final ActiveMQDestination origDest,
+                                                 final WireFormat marshaller,
+                                                 final CoreMessage coreMessage) throws IOException {
+      final ByteSequence origDestBytes = marshaller.marshal(origDest);
+      origDestBytes.compact();
+      coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
+   }
+
    private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) {
       for (Entry<String, Object> entry : map.entrySet()) {
          SimpleString key = new SimpleString(entry.getKey());


[06/10] activemq-artemis git commit: ARTEMIS-1616 OpenWire improvements

Posted by cl...@apache.org.
ARTEMIS-1616 OpenWire improvements

Used SimpleString on OpenWireMessageConverter to avoid translations on CoreMessage


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/64724c35
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/64724c35
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/64724c35

Branch: refs/heads/master
Commit: 64724c3520586c6cd1bc0aec9942ae5bb5562459
Parents: 17c0a33
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jan 16 22:24:08 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 22 18:02:03 2018 -0500

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 331 ++++++++++---------
 .../core/protocol/openwire/amq/AMQConsumer.java |   2 +-
 2 files changed, 167 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 54f3c99..457593d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -71,29 +71,31 @@ import org.fusesource.hawtbuf.UTF8Buffer;
 
 public class OpenWireMessageConverter implements MessageConverter<OpenwireMessage> {
 
-   public static final String AMQ_PREFIX = "__HDR_";
-   public static final String AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = AMQ_PREFIX + "dlqDeliveryFailureCause";
-
-   private static final String AMQ_MSG_ARRIVAL = AMQ_PREFIX + "ARRIVAL";
-   private static final String AMQ_MSG_BROKER_IN_TIME = AMQ_PREFIX + "BROKER_IN_TIME";
-
-   private static final String AMQ_MSG_BROKER_PATH = AMQ_PREFIX + "BROKER_PATH";
-   private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
-   private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
-   private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
-   private static final String AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID.toString();
-   private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
-   private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
-   private static final String AMQ_MSG_ORIG_DESTINATION = AMQ_PREFIX + "ORIG_DESTINATION";
-   private static final String AMQ_MSG_ORIG_TXID = AMQ_PREFIX + "ORIG_TXID";
-   private static final String AMQ_MSG_PRODUCER_ID = AMQ_PREFIX + "PRODUCER_ID";
-   private static final String AMQ_MSG_MARSHALL_PROP = AMQ_PREFIX + "MARSHALL_PROP";
-   private static final String AMQ_MSG_REPLY_TO = AMQ_PREFIX + "REPLY_TO";
-
-   private static final String AMQ_MSG_USER_ID = AMQ_PREFIX + "USER_ID";
-
-   private static final String AMQ_MSG_DROPPABLE = AMQ_PREFIX + "DROPPABLE";
-   private static final String AMQ_MSG_COMPRESSED = AMQ_PREFIX + "COMPRESSED";
+   private static final SimpleString JMS_TYPE_PROPERTY = new SimpleString("JMSType");
+   private static final SimpleString JMS_CORRELATION_ID_PROPERTY = new SimpleString("JMSCorrelationID");
+   private static final SimpleString AMQ_PREFIX = new SimpleString("__HDR_");
+   public static final SimpleString AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY = new SimpleString(AMQ_PREFIX + "dlqDeliveryFailureCause");
+
+   private static final SimpleString AMQ_MSG_ARRIVAL = new SimpleString(AMQ_PREFIX + "ARRIVAL");
+   private static final SimpleString AMQ_MSG_BROKER_IN_TIME = new SimpleString(AMQ_PREFIX + "BROKER_IN_TIME");
+
+   private static final SimpleString AMQ_MSG_BROKER_PATH = new SimpleString(AMQ_PREFIX + "BROKER_PATH");
+   private static final SimpleString AMQ_MSG_CLUSTER = new SimpleString(AMQ_PREFIX + "CLUSTER");
+   private static final SimpleString AMQ_MSG_COMMAND_ID = new SimpleString(AMQ_PREFIX + "COMMAND_ID");
+   private static final SimpleString AMQ_MSG_DATASTRUCTURE = new SimpleString(AMQ_PREFIX + "DATASTRUCTURE");
+   private static final SimpleString AMQ_MSG_GROUP_ID = org.apache.activemq.artemis.api.core.Message.HDR_GROUP_ID;
+   private static final SimpleString AMQ_MSG_GROUP_SEQUENCE = new SimpleString(AMQ_PREFIX + "GROUP_SEQUENCE");
+   private static final SimpleString AMQ_MSG_MESSAGE_ID = new SimpleString(AMQ_PREFIX + "MESSAGE_ID");
+   private static final SimpleString AMQ_MSG_ORIG_DESTINATION =  new SimpleString(AMQ_PREFIX + "ORIG_DESTINATION");
+   private static final SimpleString AMQ_MSG_ORIG_TXID = new SimpleString(AMQ_PREFIX + "ORIG_TXID");
+   private static final SimpleString AMQ_MSG_PRODUCER_ID =  new SimpleString(AMQ_PREFIX + "PRODUCER_ID");
+   private static final SimpleString AMQ_MSG_MARSHALL_PROP = new SimpleString(AMQ_PREFIX + "MARSHALL_PROP");
+   private static final SimpleString AMQ_MSG_REPLY_TO = new SimpleString(AMQ_PREFIX + "REPLY_TO");
+
+   private static final SimpleString AMQ_MSG_USER_ID = new SimpleString(AMQ_PREFIX + "USER_ID");
+
+   private static final SimpleString AMQ_MSG_DROPPABLE =  new SimpleString(AMQ_PREFIX + "DROPPABLE");
+   private static final SimpleString AMQ_MSG_COMPRESSED = new SimpleString(AMQ_PREFIX + "COMPRESSED");
 
    private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
 
@@ -128,7 +130,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       final String type = messageSend.getType();
       if (type != null) {
-         coreMessage.putStringProperty(new SimpleString("JMSType"), new SimpleString(type));
+         coreMessage.putStringProperty(JMS_TYPE_PROPERTY, new SimpleString(type));
       }
       coreMessage.setDurable(messageSend.isPersistent());
       coreMessage.setExpiration(messageSend.getExpiration());
@@ -185,7 +187,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       coreMessage.putIntProperty(AMQ_MSG_COMMAND_ID, messageSend.getCommandId());
       final String corrId = messageSend.getCorrelationId();
       if (corrId != null) {
-         coreMessage.putStringProperty("JMSCorrelationID", corrId);
+         coreMessage.putStringProperty(JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId));
       }
       final DataStructure ds = messageSend.getDataStructure();
       if (ds != null) {
@@ -193,7 +195,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       final String groupId = messageSend.getGroupID();
       if (groupId != null) {
-         coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
+         coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, new SimpleString(groupId));
       }
       coreMessage.putIntProperty(AMQ_MSG_GROUP_SEQUENCE, messageSend.getGroupSequence());
 
@@ -219,7 +221,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
 
       final String userId = messageSend.getUserID();
       if (userId != null) {
-         coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);
+         coreMessage.putStringProperty(AMQ_MSG_USER_ID, new SimpleString(userId));
       }
       coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable());
 
@@ -415,7 +417,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             builder.append(','); //is this separator safe?
          }
       }
-      coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, builder.toString());
+      coreMessage.putStringProperty(AMQ_MSG_BROKER_PATH, new SimpleString(builder.toString()));
    }
 
    private static void putMsgCluster(final BrokerId[] cluster, final CoreMessage coreMessage) {
@@ -426,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             builder.append(','); //is this separator safe?
          }
       }
-      coreMessage.putStringProperty(AMQ_MSG_CLUSTER, builder.toString());
+      coreMessage.putStringProperty(AMQ_MSG_CLUSTER, new SimpleString(builder.toString()));
    }
 
    private static void putMsgDataStructure(final DataStructure ds,
@@ -557,7 +559,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
             throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
       }
 
-      String type = coreMessage.getStringProperty(new SimpleString("JMSType"));
+      String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
       if (type != null) {
          amqMsg.setJMSType(type);
       }
@@ -580,156 +582,155 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       byte[] bytes = null;
       if (buffer != null) {
          buffer.resetReaderIndex();
-         synchronized (buffer) {
-            if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
-               SimpleString text = buffer.readNullableSimpleString();
-               if (text != null) {
-                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
-                  OutputStream out = bytesOut;
-                  if (isCompressed) {
-                     out = new DeflaterOutputStream(out, true);
-                  }
-                  try (DataOutputStream dataOut = new DataOutputStream(out)) {
-                     MarshallingSupport.writeUTF8(dataOut, text.toString());
-                     dataOut.flush();
-                     bytes = bytesOut.toByteArray();
-                  }
-               }
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
-               TypedProperties mapData = new TypedProperties();
-               //it could be a null map
-               if (buffer.readableBytes() > 0) {
-                  mapData.decode(buffer.byteBuf());
-                  Map<String, Object> map = mapData.getMap();
-                  ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
-                  OutputStream os = out;
-                  if (isCompressed) {
-                     os = new DeflaterOutputStream(os, true);
-                  }
-                  try (DataOutputStream dataOut = new DataOutputStream(os)) {
-                     MarshallingSupport.marshalPrimitiveMap(map, dataOut);
-                     dataOut.flush();
-                  }
-                  bytes = out.toByteArray();
-               }
 
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
-               if (buffer.readableBytes() > 0) {
-                  int len = buffer.readInt();
-                  bytes = new byte[len];
-                  buffer.readBytes(bytes);
-                  if (isCompressed) {
-                     ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                     try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
-                        out.write(bytes);
-                        out.flush();
-                     }
-                     bytes = bytesOut.toByteArray();
-                  }
-               }
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
-               org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+         if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
+            SimpleString text = buffer.readNullableSimpleString();
+            if (text != null) {
+               ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(text.length() + 4);
                OutputStream out = bytesOut;
                if (isCompressed) {
-                  out = new DeflaterOutputStream(bytesOut, true);
+                  out = new DeflaterOutputStream(out, true);
                }
                try (DataOutputStream dataOut = new DataOutputStream(out)) {
-
-                  boolean stop = false;
-                  while (!stop && buffer.readable()) {
-                     byte primitiveType = buffer.readByte();
-                     switch (primitiveType) {
-                        case DataConstants.BOOLEAN:
-                           MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
-                           break;
-                        case DataConstants.BYTE:
-                           MarshallingSupport.marshalByte(dataOut, buffer.readByte());
-                           break;
-                        case DataConstants.BYTES:
-                           int len = buffer.readInt();
-                           byte[] bytesData = new byte[len];
-                           buffer.readBytes(bytesData);
-                           MarshallingSupport.marshalByteArray(dataOut, bytesData);
-                           break;
-                        case DataConstants.CHAR:
-                           char ch = (char) buffer.readShort();
-                           MarshallingSupport.marshalChar(dataOut, ch);
-                           break;
-                        case DataConstants.DOUBLE:
-                           double doubleVal = Double.longBitsToDouble(buffer.readLong());
-                           MarshallingSupport.marshalDouble(dataOut, doubleVal);
-                           break;
-                        case DataConstants.FLOAT:
-                           Float floatVal = Float.intBitsToFloat(buffer.readInt());
-                           MarshallingSupport.marshalFloat(dataOut, floatVal);
-                           break;
-                        case DataConstants.INT:
-                           MarshallingSupport.marshalInt(dataOut, buffer.readInt());
-                           break;
-                        case DataConstants.LONG:
-                           MarshallingSupport.marshalLong(dataOut, buffer.readLong());
-                           break;
-                        case DataConstants.SHORT:
-                           MarshallingSupport.marshalShort(dataOut, buffer.readShort());
-                           break;
-                        case DataConstants.STRING:
-                           String string = buffer.readNullableString();
-                           if (string == null) {
-                              MarshallingSupport.marshalNull(dataOut);
-                           } else {
-                              MarshallingSupport.marshalString(dataOut, string);
-                           }
-                           break;
-                        default:
-                           //now we stop
-                           stop = true;
-                           break;
-                     }
-                     dataOut.flush();
-                  }
+                  MarshallingSupport.writeUTF8(dataOut, text.toString());
+                  dataOut.flush();
+                  bytes = bytesOut.toByteArray();
                }
-               bytes = bytesOut.toByteArray();
-            } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
-               int n = buffer.readableBytes();
-               bytes = new byte[n];
-               buffer.readBytes(bytes);
+            }
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.MAP_TYPE) {
+            TypedProperties mapData = new TypedProperties();
+            //it could be a null map
+            if (buffer.readableBytes() > 0) {
+               mapData.decode(buffer.byteBuf());
+               Map<String, Object> map = mapData.getMap();
+               ByteArrayOutputStream out = new ByteArrayOutputStream(mapData.getEncodeSize());
+               OutputStream os = out;
                if (isCompressed) {
-                  int length = bytes.length;
-                  Deflater deflater = new Deflater();
-                  try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
-                     compressed.write(new byte[4]);
-                     deflater.setInput(bytes);
-                     deflater.finish();
-                     byte[] bytesBuf = new byte[1024];
-                     while (!deflater.finished()) {
-                        int count = deflater.deflate(bytesBuf);
-                        compressed.write(bytesBuf, 0, count);
-                     }
-                     compressed.flush();
-                     ByteSequence byteSeq = compressed.toByteSequence();
-                     ByteSequenceData.writeIntBig(byteSeq, length);
-                     bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
-                  } finally {
-                     deflater.end();
-                  }
+                  os = new DeflaterOutputStream(os, true);
+               }
+               try (DataOutputStream dataOut = new DataOutputStream(os)) {
+                  MarshallingSupport.marshalPrimitiveMap(map, dataOut);
+                  dataOut.flush();
                }
-            } else {
-               int n = buffer.readableBytes();
-               bytes = new byte[n];
+               bytes = out.toByteArray();
+            }
+
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE) {
+            if (buffer.readableBytes() > 0) {
+               int len = buffer.readInt();
+               bytes = new byte[len];
                buffer.readBytes(bytes);
                if (isCompressed) {
-                  try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-                       DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+                  ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                  try (DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
                      out.write(bytes);
                      out.flush();
-                     bytes = bytesOut.toByteArray();
                   }
+                  bytes = bytesOut.toByteArray();
+               }
+            }
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.STREAM_TYPE) {
+            org.apache.activemq.util.ByteArrayOutputStream bytesOut = new org.apache.activemq.util.ByteArrayOutputStream();
+            OutputStream out = bytesOut;
+            if (isCompressed) {
+               out = new DeflaterOutputStream(bytesOut, true);
+            }
+            try (DataOutputStream dataOut = new DataOutputStream(out)) {
+
+               boolean stop = false;
+               while (!stop && buffer.readable()) {
+                  byte primitiveType = buffer.readByte();
+                  switch (primitiveType) {
+                     case DataConstants.BOOLEAN:
+                        MarshallingSupport.marshalBoolean(dataOut, buffer.readBoolean());
+                        break;
+                     case DataConstants.BYTE:
+                        MarshallingSupport.marshalByte(dataOut, buffer.readByte());
+                        break;
+                     case DataConstants.BYTES:
+                        int len = buffer.readInt();
+                        byte[] bytesData = new byte[len];
+                        buffer.readBytes(bytesData);
+                        MarshallingSupport.marshalByteArray(dataOut, bytesData);
+                        break;
+                     case DataConstants.CHAR:
+                        char ch = (char) buffer.readShort();
+                        MarshallingSupport.marshalChar(dataOut, ch);
+                        break;
+                     case DataConstants.DOUBLE:
+                        double doubleVal = Double.longBitsToDouble(buffer.readLong());
+                        MarshallingSupport.marshalDouble(dataOut, doubleVal);
+                        break;
+                     case DataConstants.FLOAT:
+                        Float floatVal = Float.intBitsToFloat(buffer.readInt());
+                        MarshallingSupport.marshalFloat(dataOut, floatVal);
+                        break;
+                     case DataConstants.INT:
+                        MarshallingSupport.marshalInt(dataOut, buffer.readInt());
+                        break;
+                     case DataConstants.LONG:
+                        MarshallingSupport.marshalLong(dataOut, buffer.readLong());
+                        break;
+                     case DataConstants.SHORT:
+                        MarshallingSupport.marshalShort(dataOut, buffer.readShort());
+                        break;
+                     case DataConstants.STRING:
+                        String string = buffer.readNullableString();
+                        if (string == null) {
+                           MarshallingSupport.marshalNull(dataOut);
+                        } else {
+                           MarshallingSupport.marshalString(dataOut, string);
+                        }
+                        break;
+                     default:
+                        //now we stop
+                        stop = true;
+                        break;
+                  }
+                  dataOut.flush();
+               }
+            }
+            bytes = bytesOut.toByteArray();
+         } else if (coreType == org.apache.activemq.artemis.api.core.Message.BYTES_TYPE) {
+            int n = buffer.readableBytes();
+            bytes = new byte[n];
+            buffer.readBytes(bytes);
+            if (isCompressed) {
+               int length = bytes.length;
+               Deflater deflater = new Deflater();
+               try (org.apache.activemq.util.ByteArrayOutputStream compressed = new org.apache.activemq.util.ByteArrayOutputStream()) {
+                  compressed.write(new byte[4]);
+                  deflater.setInput(bytes);
+                  deflater.finish();
+                  byte[] bytesBuf = new byte[1024];
+                  while (!deflater.finished()) {
+                     int count = deflater.deflate(bytesBuf);
+                     compressed.write(bytesBuf, 0, count);
+                  }
+                  compressed.flush();
+                  ByteSequence byteSeq = compressed.toByteSequence();
+                  ByteSequenceData.writeIntBig(byteSeq, length);
+                  bytes = Arrays.copyOfRange(byteSeq.data, 0, byteSeq.length);
+               } finally {
+                  deflater.end();
+               }
+            }
+         } else {
+            int n = buffer.readableBytes();
+            bytes = new byte[n];
+            buffer.readBytes(bytes);
+            if (isCompressed) {
+               try (ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+                    DeflaterOutputStream out = new DeflaterOutputStream(bytesOut, true)) {
+                  out.write(bytes);
+                  out.flush();
+                  bytes = bytesOut.toByteArray();
                }
             }
-
-            buffer.resetReaderIndex();// this is important for topics as the buffer
-            // may be read multiple times
          }
+
+         buffer.resetReaderIndex();// this is important for topics as the buffer
+         // may be read multiple times
       }
 
       //we need check null because messages may come from other clients
@@ -767,7 +768,7 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
       }
       amqMsg.setCommandId(commandId);
 
-      SimpleString corrId = (SimpleString) coreMessage.getObjectProperty("JMSCorrelationID");
+      SimpleString corrId = (SimpleString) coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
       if (corrId != null) {
          amqMsg.setCorrelationId(corrId.toString());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/64724c35/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 57865b7..77051cc 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -286,7 +286,7 @@ public class AMQConsumer {
             for (MessageReference ref : ackList) {
                Throwable poisonCause = ack.getPoisonCause();
                if (poisonCause != null) {
-                  ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, poisonCause.toString());
+                  ref.getMessage().putStringProperty(OpenWireMessageConverter.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY, new SimpleString(poisonCause.toString()));
                }
                ref.getQueue().sendToDeadLetterAddress(transaction, ref);
             }