You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2015/02/12 16:14:47 UTC

[1/5] activemq-6 git commit: ACTIVEMQ6-78 Adding tests to evaluate this task

Repository: activemq-6
Updated Branches:
  refs/heads/master 1d5a7a10d -> b8db8b051


ACTIVEMQ6-78 Adding tests to evaluate this task

https://issues.apache.org/jira/browse/ACTIVEMQ6-78

This commit is just adding tests I used to debug the blocked calls issue
There are some profiling parameters you can use that I added as a comment to the pom

The reason this is a separate commit is that it would be easier to validate the results of optimizations while
checking after and before any changes


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/41b28f4b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/41b28f4b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/41b28f4b

Branch: refs/heads/master
Commit: 41b28f4b23cb42bb168a7b1dfb0fb6e5d6faa053
Parents: 1d5a7a1
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 10 10:53:02 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 12:47:01 2015 -0500

----------------------------------------------------------------------
 pom.xml                                         |   7 +-
 .../sends/AbstractSendReceivePerfTest.java      | 247 +++++++++++++++++++
 .../tests/performance/sends/ClientACKPerf.java  | 130 ++++++++++
 .../sends/MeasureCommitPerfTest.java            |  81 ++++++
 .../tests/performance/sends/PreACKPerf.java     |  93 +++++++
 5 files changed, 557 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c3ae18a..4172145 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,12 @@
        see https://intellij-support.jetbrains.com/entries/23395793
 
        Also see: http://youtrack.jetbrains.com/issue/IDEA-125696
-     -->
+
+
+       For profiling add this line and use jmc (Java Mission Control) to evaluate the results:
+           -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=delay=30s,duration=120s,filename=/tmp/myrecording.jfr
+
+      -->
  
       <activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
          -Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
new file mode 100644
index 0000000..c8b2d7f
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.apache.activemq.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Client-ack time
+ *
+ * @author Clebert Suconic
+ */
+public abstract class AbstractSendReceivePerfTest extends JMSTestBase
+{
+   protected static final String Q_NAME = "test-queue-01";
+   private Queue queue;
+
+   protected AtomicBoolean running = new AtomicBoolean(true);
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      jmsServer.createQueue(false, Q_NAME, null, true, Q_NAME);
+      queue = ActiveMQJMSClient.createQueue(Q_NAME);
+
+      AddressSettings settings = new AddressSettings();
+      settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      settings.setMaxSizeBytes(Long.MAX_VALUE);
+      server.getAddressSettingsRepository().clear();
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+   }
+
+
+   @Override
+   protected void registerConnectionFactory() throws Exception
+   {
+      List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>();
+      connectorConfigs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+
+      createCF(connectorConfigs, "/cf");
+
+      cf = (ConnectionFactory) namingContext.lookup("/cf");
+   }
+
+
+   private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(AbstractSendReceivePerfTest.class.getName());
+
+
+   @Test
+   public void testSendReceive() throws Exception
+   {
+      long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000);
+
+
+      MessageReceiver receiver = new MessageReceiver(Q_NAME, numberOfSamples);
+      receiver.start();
+      MessageSender sender = new MessageSender(Q_NAME);
+      sender.start();
+
+      receiver.join();
+      sender.join();
+
+      assertFalse(receiver.failed);
+      assertFalse(sender.failed);
+
+   }
+
+   final Semaphore pendingCredit = new Semaphore(5000);
+
+   /**
+    * to be called after a message is consumed
+    * so the flow control of the test kicks in.
+    */
+   protected final void afterConsume(Message message)
+   {
+      if (message != null)
+      {
+         pendingCredit.release();
+      }
+   }
+
+
+   protected final void beforeSend()
+   {
+      while (running.get())
+      {
+         try
+         {
+            if (pendingCredit.tryAcquire(1, TimeUnit.SECONDS))
+            {
+               return;
+            }
+            else
+            {
+               System.out.println("Couldn't get credits!");
+            }
+         }
+         catch (Throwable e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+   }
+
+
+
+
+   private class MessageReceiver extends Thread
+   {
+      private final String qName;
+      private final long numberOfSamples;
+
+      public boolean failed = false;
+
+      public MessageReceiver(String qname, long numberOfSamples) throws Exception
+      {
+         super("Receiver " + qname);
+         this.qName = qname;
+         this.numberOfSamples = numberOfSamples;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            LOGGER.info("Receiver: Connecting");
+            Connection c = cf.createConnection();
+
+            consumeMessages(c, qName);
+
+            c.close();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            failed = true;
+         }
+         finally
+         {
+            running.set(false);
+         }
+      }
+   }
+
+   protected abstract void consumeMessages(Connection c, String qName) throws Exception;
+
+   private class MessageSender extends Thread
+   {
+      protected String qName;
+
+      public boolean failed = false;
+
+      public MessageSender(String qname) throws Exception
+      {
+         super("Sender " + qname);
+
+         this.qName = qname;
+      }
+
+      @Override
+      public void run()
+      {
+         try
+         {
+            LOGGER.info("Sender: Connecting");
+            Connection c = cf.createConnection();
+
+            sendMessages(c, qName);
+
+            c.close();
+
+         }
+         catch (Exception e)
+         {
+            failed = true;
+            if (e instanceof InterruptedException)
+            {
+               LOGGER.info("Sender done.");
+            }
+            else
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   /* This will by default send non persistent messages */
+   protected void sendMessages(Connection c, String qName) throws JMSException
+   {
+      Session s = null;
+      s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      LOGGER.info("Sender: Using AUTO-ACK session");
+
+
+      Queue q = s.createQueue(qName);
+      MessageProducer producer = s.createProducer(null);
+      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+
+      long sent = 0;
+      while (running.get())
+      {
+         beforeSend();
+         producer.send(q, s.createTextMessage("Message_" + (sent++)));
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
new file mode 100644
index 0000000..6dba4f4
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * @author clebertsuconic
+ */
+@RunWith(Parameterized.class)
+public class ClientACKPerf extends AbstractSendReceivePerfTest
+{
+
+   @Parameterized.Parameters(name = "batchSize={0}")
+   public static Collection<Object[]> data()
+   {
+      List<Object[]> list = Arrays.asList(new Object[][]{
+         {1},
+         {2000}});
+
+      System.out.println("Size = " + list.size());
+      return list;
+   }
+
+   public ClientACKPerf(int batchSize)
+   {
+      super();
+      this.batchSize = batchSize;
+   }
+
+
+   public final int batchSize;
+
+   @Override
+   protected void consumeMessages(Connection c, String qName) throws Exception
+   {
+      int mode = 0;
+      mode = Session.CLIENT_ACKNOWLEDGE;
+
+      System.out.println("Receiver: Using PRE-ACK mode");
+
+      Session s = c.createSession(false, mode);
+      Queue q = s.createQueue(qName);
+      MessageConsumer consumer = s.createConsumer(q, null, false);
+
+      c.start();
+
+      Message m = null;
+
+      long totalTimeACKTime = 0;
+
+
+      long start = System.currentTimeMillis();
+
+      long nmessages = 0;
+      long timeout = System.currentTimeMillis() + 60 * 1000;
+      while (timeout > System.currentTimeMillis())
+      {
+         m = consumer.receive(5000);
+         afterConsume(m);
+
+
+         if (m == null)
+         {
+            throw new Exception("Failed with m = null");
+         }
+
+         if (nmessages++ % batchSize == 0)
+         {
+            long startACK = System.nanoTime();
+            m.acknowledge();
+            long endACK = System.nanoTime();
+            totalTimeACKTime += (endACK - startACK);
+         }
+
+
+         if (nmessages % 10000 == 0)
+         {
+            printMsgsSec(start, nmessages, totalTimeACKTime);
+         }
+      }
+
+
+      printMsgsSec(start, nmessages, totalTimeACKTime);
+   }
+
+
+
+   protected void printMsgsSec(final long start, final double nmessages, final double totalTimeACKTime)
+   {
+
+      long end = System.currentTimeMillis();
+      double elapsed = ((double) end - (double) start) / 1000f;
+
+      double messagesPerSecond = nmessages / elapsed;
+      double nAcks = nmessages / batchSize;
+
+      System.out.println("batchSize=" + batchSize + ", numberOfMessages="
+                            + nmessages + ", elapsedTime=" + elapsed + " msgs/sec= " + messagesPerSecond + ",totalTimeAcking=" +  String.format("%10.4f", totalTimeACKTime) +
+                            ", avgACKTime=" + String.format("%10.4f", (totalTimeACKTime / nAcks)));
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
new file mode 100644
index 0000000..0528d0e
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+
+/**
+ * @author clebertsuconic
+ */
+
+public class MeasureCommitPerfTest extends AbstractSendReceivePerfTest
+{
+   @Override
+   protected void consumeMessages(Connection c, String qName) throws Exception
+   {
+   }
+
+
+   /* This will by default send non persistent messages */
+   protected void sendMessages(Connection c, String qName) throws JMSException
+   {
+      Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+
+
+      long timeout = System.currentTimeMillis() + 30 * 1000;
+
+      long startMeasure = System.currentTimeMillis() + 5000;
+      long start = 0;
+      long committs = 0;
+      while (timeout > System.currentTimeMillis())
+      {
+
+         if (start == 0 && System.currentTimeMillis() > startMeasure)
+         {
+            System.out.println("heat up");
+            start = System.currentTimeMillis();
+            committs = 0;
+         }
+
+         s.commit();
+         committs++;
+         if (start > 0 && committs % 1000 == 0) printCommitsSecond(start, committs);
+      }
+      printCommitsSecond(start, committs);
+
+      s.close();
+   }
+
+
+   protected void printCommitsSecond(final long start, final double committs)
+   {
+
+      long end = System.currentTimeMillis();
+      double elapsed = ((double) end - (double) start) / 1000f;
+
+      double commitsPerSecond = committs / elapsed;
+
+      System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
+                            + committs + ", elapsed=" + elapsed + " msgs/sec= " + commitsPerSecond);
+
+   }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
new file mode 100644
index 0000000..a6d2906
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.api.jms.ActiveMQJMSConstants;
+
+/**
+ * @author clebertsuconic
+ */
+
+public class PreACKPerf extends AbstractSendReceivePerfTest
+{
+   @Override
+   protected void consumeMessages(Connection c, String qName) throws Exception
+   {
+      int mode = 0;
+      mode = ActiveMQJMSConstants.PRE_ACKNOWLEDGE;
+
+      System.out.println("Receiver: Using PRE-ACK mode");
+
+      Session s = c.createSession(false, mode);
+      Queue q = s.createQueue(qName);
+      MessageConsumer consumer = s.createConsumer(q, null, false);
+
+      c.start();
+
+      Message m = null;
+
+
+      long start = System.currentTimeMillis();
+
+      long nmessages = 0;
+      long timeout = System.currentTimeMillis() + 30 * 1000;
+      while (timeout > System.currentTimeMillis())
+      {
+         m = consumer.receive(5000);
+
+         nmessages++;
+
+         if (m == null)
+         {
+            throw new Exception("Failed with m = null");
+         }
+
+         if (nmessages % 10000 == 0)
+         {
+            printMsgsSec(start, nmessages);
+         }
+
+      }
+
+      long end = System.currentTimeMillis();
+
+      printMsgsSec(start, nmessages);
+   }
+
+
+
+   protected void printMsgsSec(final long start, final double nmessages)
+   {
+
+      long end = System.currentTimeMillis();
+      double elapsed = ((double) end - (double) start) / 1000f;
+
+      double messagesPerSecond = nmessages / elapsed;
+
+      System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
+                            + nmessages + ", elapsed=" + elapsed + " msgs/sec= " + messagesPerSecond);
+
+   }
+
+
+}


[3/5] activemq-6 git commit: ACTIVEMQ6-78 Improving performance over Netty NIO blocked calls

Posted by an...@apache.org.
ACTIVEMQ6-78 Improving performance over Netty NIO blocked calls

https://issues.apache.org/jira/browse/ACTIVEMQ6-78 performance work

There are two aspects of this work. First avoid asynchronous packets and avoid
context switch over the executors. Packet had a method to make certain packets such
as commit to use a different executor. Since it's NIO everything is done at the Netty thread now.

The second aspect was to make sure we use the proper buffering


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/f7c4d56c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/f7c4d56c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/f7c4d56c

Branch: refs/heads/master
Commit: f7c4d56cc771e2d4ff54fd2796ad2cdfae9f5e13
Parents: 41b28f4
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 10 11:30:18 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 12:47:14 2015 -0500

----------------------------------------------------------------------
 .../core/buffers/impl/ChannelBufferWrapper.java | 12 +++++++
 .../impl/ResetLimitWrappedActiveMQBuffer.java   |  4 ++-
 .../activemq/core/protocol/core/Packet.java     |  2 --
 .../impl/ActiveMQClientProtocolManager.java     |  2 +-
 .../core/protocol/core/impl/PacketImpl.java     |  7 +---
 .../core/impl/RemotingConnectionImpl.java       | 37 ++------------------
 .../core/impl/wireformat/RollbackMessage.java   |  6 ----
 .../impl/wireformat/SessionCloseMessage.java    |  6 ----
 .../impl/wireformat/SessionCommitMessage.java   |  5 ---
 .../impl/wireformat/SessionXACommitMessage.java |  6 ----
 .../wireformat/SessionXAPrepareMessage.java     |  6 ----
 .../wireformat/SessionXARollbackMessage.java    |  6 ----
 .../remoting/impl/netty/NettyConnection.java    |  6 ++--
 .../remoting/impl/netty/NettyConnector.java     |  3 +-
 .../protocol/AbstractRemotingConnection.java    |  4 +--
 .../spi/core/protocol/RemotingConnection.java   |  3 +-
 .../activemq/spi/core/remoting/Connection.java  |  2 +-
 .../protocol/openwire/OpenWireConnection.java   |  2 +-
 .../core/protocol/stomp/StompConnection.java    |  2 +-
 .../core/remoting/impl/invm/InVMConnection.java |  2 +-
 .../impl/netty/NettyServerConnection.java       |  2 +-
 .../jms/tests/message/MessageHeaderTest.java    |  4 +--
 .../impl/netty/NettyConnectionTest.java         |  2 +-
 23 files changed, 35 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
index a3fa5b5..53d7306 100644
--- a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
+++ b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
@@ -35,6 +35,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer
    protected ByteBuf buffer; // NO_UCD (use final)
    private final boolean releasable;
 
+   public static ByteBuf unwrap(ByteBuf buffer)
+   {
+      ByteBuf parent;
+      while ((parent = buffer.unwrap()) != null &&
+              parent != buffer) // this last part is just in case the semantic
+      {                         // ever changes where unwrap is returning itself
+         buffer = parent;
+      }
+
+      return buffer;
+   }
+
    public ChannelBufferWrapper(final ByteBuf buffer)
    {
       this(buffer, false);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 96b3e2b..1cd342d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -46,7 +46,9 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
 
    public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message)
    {
-      super(buffer.byteBuf());
+      // a wrapped inside a wrapper will increase the stack size.
+      // we fixed this here due to some profiling testing
+      super(unwrap(buffer.byteBuf()));
 
       this.limit = limit;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
index 4027c67..6b23bff 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
@@ -85,6 +85,4 @@ public interface Packet
     * @return true if confirmation is required
     */
    boolean isRequiresConfirmations();
-
-   boolean isAsyncExec();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index b7366df..890e8d0 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -482,7 +482,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
       {
          // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
          String handshake = "HORNETQ";
-         ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length());
+         ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());
          amqbuffer.writeBytes(handshake.getBytes());
          transportConnection.write(amqbuffer);
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
index e2bdc28..61e0eca 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
@@ -276,7 +276,7 @@ public class PacketImpl implements Packet
 
    public ActiveMQBuffer encode(final RemotingConnection connection)
    {
-      ActiveMQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE);
+      ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
 
       // The standard header fields
 
@@ -333,11 +333,6 @@ public class PacketImpl implements Packet
       return true;
    }
 
-   public boolean isAsyncExec()
-   {
-      return false;
-   }
-
    @Override
    public String toString()
    {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
index 5ef4cfd..8820850 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -81,8 +81,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
 
    private final Object failLock = new Object();
 
-   private volatile boolean executing;
-
    private final SimpleString nodeID;
 
    private String clientID;
@@ -381,39 +379,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
             ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
          }
 
-         if (packet.isAsyncExec() && executor != null)
-         {
-            executing = true;
-
-            executor.execute(new Runnable()
-            {
-               public void run()
-               {
-                  try
-                  {
-                     doBufferReceived(packet);
-                  }
-                  catch (Throwable t)
-                  {
-                     ActiveMQClientLogger.LOGGER.errorHandlingPacket(t, packet);
-                  }
-
-                  executing = false;
-               }
-            });
-         }
-         else
-         {
-            //To prevent out of order execution if interleaving sync and async operations on same connection
-            while (executing)
-            {
-               Thread.yield();
-            }
-
-            // Pings must always be handled out of band so we can send pings back to the client quickly
-            // otherwise they would get in the queue with everything else which might give an intolerable delay
-            doBufferReceived(packet);
-         }
+         dataReceived = true;
+         doBufferReceived(packet);
 
          super.bufferReceived(connectionID, buffer);
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
index 41c5735..340c73a 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
@@ -70,12 +70,6 @@ public class RollbackMessage extends PacketImpl
    }
 
    @Override
-   public boolean isAsyncExec()
-   {
-      return true;
-   }
-
-   @Override
    public int hashCode()
    {
       final int prime = 31;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
index 1c8a276..dc61860 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
@@ -49,10 +49,4 @@ public class SessionCloseMessage extends PacketImpl
       // TODO
       return 0;
    }
-
-   @Override
-   public boolean isAsyncExec()
-   {
-      return true;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
index c7242fb..1b1e081 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
@@ -30,9 +30,4 @@ public class SessionCommitMessage extends PacketImpl
       super(SESS_COMMIT);
    }
 
-   @Override
-   public boolean isAsyncExec()
-   {
-      return true;
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
index 65fdf33..14668cb 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
@@ -56,12 +56,6 @@ public class SessionXACommitMessage extends PacketImpl
    }
 
    @Override
-   public boolean isAsyncExec()
-   {
-      return true;
-   }
-
-   @Override
    public void encodeRest(final ActiveMQBuffer buffer)
    {
       XidCodecSupport.encodeXid(xid, buffer);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
index b9a531a..8ff5b15 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
@@ -62,12 +62,6 @@ public class SessionXAPrepareMessage extends PacketImpl
    }
 
    @Override
-   public boolean isAsyncExec()
-   {
-      return true;
-   }
-
-   @Override
    public int hashCode()
    {
       final int prime = 31;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
index 8efab01..272386d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
@@ -63,12 +63,6 @@ public class SessionXARollbackMessage extends PacketImpl
    }
 
    @Override
-   public boolean isAsyncExec()
-   {
-      return true;
-   }
-
-   @Override
    public int hashCode()
    {
       final int prime = 31;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
index a73aa1b..c7eafe7 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
@@ -172,9 +172,9 @@ public class NettyConnection implements Connection
       listener.connectionDestroyed(getID());
    }
 
-   public ActiveMQBuffer createBuffer(final int size)
+   public ActiveMQBuffer createTransportBuffer(final int size)
    {
-      return new ChannelBufferWrapper(channel.alloc().buffer(size));
+      return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
    }
 
    public Object getID()
@@ -199,7 +199,7 @@ public class NettyConnection implements Connection
             {
                channel.writeAndFlush(batchBuffer.byteBuf());
 
-               batchBuffer = createBuffer(BATCHING_BUFFER_SIZE);
+               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
             }
          }
          finally

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
index 3ed9e87..8425edd 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
@@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
@@ -477,7 +476,7 @@ public class NettyConnector extends AbstractConnector
       }
       bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
       bootstrap.option(ChannelOption.SO_REUSEADDR, true);
-      bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
+      bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
       channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
 
       final SSLContext context;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
index a48845f..22b26ee 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
@@ -182,9 +182,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection
       closeListeners.addAll(listeners);
    }
 
-   public ActiveMQBuffer createBuffer(final int size)
+   public ActiveMQBuffer createTransportBuffer(final int size)
    {
-      return transportConnection.createBuffer(size);
+      return transportConnection.createTransportBuffer(size);
    }
 
    public Connection getTransportConnection()

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
index 186e098..9eb287e 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
@@ -115,11 +115,12 @@ public interface RemotingConnection extends BufferHandler
 
    /**
     * creates a new ActiveMQBuffer of the specified size.
+    * For the purpose of i/o outgoing packets
     *
     * @param size the size of buffer required
     * @return the buffer
     */
-   ActiveMQBuffer createBuffer(int size);
+   ActiveMQBuffer createTransportBuffer(int size);
 
    /**
     * called when the underlying connection fails.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
index a4896e2..b6060d3 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
@@ -36,7 +36,7 @@ public interface Connection
     * @param size the size of buffer to create
     * @return the new buffer.
     */
-   ActiveMQBuffer createBuffer(int size);
+   ActiveMQBuffer createTransportBuffer(int size);
 
 
    RemotingConnection getProtocolConnection();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
index 20762e2..bf906bc 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
@@ -440,7 +440,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
    }
 
    @Override
-   public ActiveMQBuffer createBuffer(int size)
+   public ActiveMQBuffer createTransportBuffer(int size)
    {
       return ActiveMQBuffers.dynamicBuffer(size);
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
index 16cf55e..9a4e7b7 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
@@ -280,7 +280,7 @@ public final class StompConnection implements RemotingConnection
    }
 
    @Override
-   public ActiveMQBuffer createBuffer(int size)
+   public ActiveMQBuffer createTransportBuffer(int size)
    {
       return ActiveMQBuffers.dynamicBuffer(size);
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
index eed1ff4..37e2acb 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
@@ -142,7 +142,7 @@ public class InVMConnection implements Connection
       }
    }
 
-   public ActiveMQBuffer createBuffer(final int size)
+   public ActiveMQBuffer createTransportBuffer(final int size)
    {
       return ActiveMQBuffers.dynamicBuffer(size);
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
index d899e85..339b407 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
@@ -34,7 +34,7 @@ public class NettyServerConnection extends NettyConnection
    }
 
    @Override
-   public ActiveMQBuffer createBuffer(int size)
+   public ActiveMQBuffer createTransportBuffer(int size)
    {
       return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
index 784a0c0..cfbf995 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
@@ -1391,7 +1391,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
       }
 
       /* (non-Javadoc)
-       * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(byte[])
+       * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(byte[])
        */
       public ActiveMQBuffer createBuffer(final byte[] bytes)
       {
@@ -1400,7 +1400,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
       }
 
       /* (non-Javadoc)
-       * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(int)
+       * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(int)
        */
       public ActiveMQBuffer createBuffer(final int size)
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index cc9d57b..b0a4c14 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -74,7 +74,7 @@ public class NettyConnectionTest extends UnitTestCase
 
       final int size = 1234;
 
-      ActiveMQBuffer buff = conn.createBuffer(size);
+      ActiveMQBuffer buff = conn.createTransportBuffer(size);
       buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
       Assert.assertEquals(size, buff.capacity());
 


[2/5] activemq-6 git commit: Fixing test failure

Posted by an...@apache.org.
Fixing test failure

This test started to fail after performance improvements from  ACTIVEMQ6-78
After some investigation it turned out that this test was racing with the client
crashing even before the queue was created or sending a non persistent message
asynchronously while the client crashed before the server received the message.

I've also decreased some of the times on pings so the test could run a bit faster


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/60179cfc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/60179cfc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/60179cfc

Branch: refs/heads/master
Commit: 60179cfc980fba4756d588c3a9d8de44be3e9960
Parents: f7c4d56
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 11 08:36:17 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 12:47:14 2015 -0500

----------------------------------------------------------------------
 .../integration/clientcrash/ClientCrashTest.java  | 18 ++++++++++++------
 .../integration/clientcrash/CrashClient.java      |  3 ++-
 .../integration/clientcrash/CrashClient2.java     |  2 +-
 3 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/60179cfc/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
index fbab2f4..25177bb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
@@ -44,9 +44,11 @@ import org.apache.activemq.tests.util.SpawnedVMSupport;
  */
 public class ClientCrashTest extends ClientTestBase
 {
-   static final int PING_PERIOD = 2000;
+   // using short values so this test can run fast
+   static final int PING_PERIOD = 100;
 
-   static final int CONNECTION_TTL = 6000;
+   // using short values so this test can run fast
+   static final int CONNECTION_TTL = 1000;
 
    // Constants -----------------------------------------------------
 
@@ -76,18 +78,22 @@ public class ClientCrashTest extends ClientTestBase
    {
       assertActiveConnections(1);
 
+      ClientSession session = sf.createSession(false, true, true);
+      session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
+
       // spawn a JVM that creates a Core client, which sends a message
+      // It has to be spawned after the queue was created.
+      // if the client is too fast you race the send before the queue was created, missing a message
       Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
 
-      ClientSession session = sf.createSession(false, true, true);
-      session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
       ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
       ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
 
+
       session.start();
 
       // receive a message from the queue
-      Message messageFromClient = consumer.receive(500000);
+      Message messageFromClient = consumer.receive(5000);
       Assert.assertNotNull("no message received", messageFromClient);
       Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
 
@@ -155,7 +161,7 @@ public class ClientCrashTest extends ClientTestBase
       session.start();
 
       // receive a message from the queue
-      ClientMessage messageFromClient = consumer.receive(10000);
+      ClientMessage messageFromClient = consumer.receive(timeout);
       Assert.assertNotNull("no message received", messageFromClient);
       Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/60179cfc/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
index 8a0c5d6..406ba56 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
@@ -59,8 +59,9 @@ public class CrashClient
          ClientSession session = sf.createSession(false, true, true);
          ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
 
+         // it has to be durable otherwise it may race dying before the client is killed
          ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE,
-                                                             false,
+                                                             true,
                                                              0,
                                                              System.currentTimeMillis(),
                                                              (byte)1);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/60179cfc/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
index 15a1c07..775b50d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
@@ -59,7 +59,7 @@ public class CrashClient2
          ClientSession session = sf.createSession(true, true, 1000000);
          ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE2);
 
-         ClientMessage message = session.createMessage(false);
+         ClientMessage message = session.createMessage(true);
          message.getBodyBuffer().writeString(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT);
 
          producer.send(message);


[5/5] activemq-6 git commit: merging #95 - performance improvements

Posted by an...@apache.org.
merging #95 - performance improvements


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/b8db8b05
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/b8db8b05
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/b8db8b05

Branch: refs/heads/master
Commit: b8db8b05145b92dc42695436267f0285a025a559
Parents: 1d5a7a1 bace921
Author: Andy Taylor <an...@gmail.com>
Authored: Thu Feb 12 15:12:35 2015 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Thu Feb 12 15:12:35 2015 +0000

----------------------------------------------------------------------
 .../core/buffers/impl/ChannelBufferWrapper.java |  12 +
 .../impl/ResetLimitWrappedActiveMQBuffer.java   |   4 +-
 .../activemq/core/protocol/core/Packet.java     |   2 -
 .../impl/ActiveMQClientProtocolManager.java     |   2 +-
 .../core/protocol/core/impl/PacketImpl.java     |   7 +-
 .../core/impl/RemotingConnectionImpl.java       |  37 +--
 .../core/impl/wireformat/RollbackMessage.java   |   6 -
 .../impl/wireformat/SessionCloseMessage.java    |   6 -
 .../impl/wireformat/SessionCommitMessage.java   |   5 -
 .../impl/wireformat/SessionXACommitMessage.java |   6 -
 .../wireformat/SessionXAPrepareMessage.java     |   6 -
 .../wireformat/SessionXARollbackMessage.java    |   6 -
 .../remoting/impl/netty/NettyConnection.java    |   6 +-
 .../remoting/impl/netty/NettyConnector.java     |   3 +-
 .../protocol/AbstractRemotingConnection.java    |   4 +-
 .../spi/core/protocol/RemotingConnection.java   |   3 +-
 .../activemq/spi/core/remoting/Connection.java  |   2 +-
 .../activemq/jms/client/ActiveMQSession.java    |   4 +-
 .../protocol/openwire/OpenWireConnection.java   |   2 +-
 .../core/protocol/stomp/StompConnection.java    |   2 +-
 .../core/remoting/impl/invm/InVMConnection.java |   2 +-
 .../impl/netty/NettyServerConnection.java       |   2 +-
 pom.xml                                         |   7 +-
 .../clientcrash/ClientCrashTest.java            |  18 +-
 .../integration/clientcrash/CrashClient.java    |   3 +-
 .../integration/clientcrash/CrashClient2.java   |   2 +-
 .../jms/tests/message/MessageHeaderTest.java    |   4 +-
 .../sends/AbstractSendReceivePerfTest.java      | 247 +++++++++++++++++++
 .../tests/performance/sends/ClientACKPerf.java  | 130 ++++++++++
 .../sends/MeasureCommitPerfTest.java            |  81 ++++++
 .../tests/performance/sends/PreACKPerf.java     |  93 +++++++
 .../impl/netty/NettyConnectionTest.java         |   2 +-
 32 files changed, 609 insertions(+), 107 deletions(-)
----------------------------------------------------------------------



[4/5] activemq-6 git commit: back-porting fix on exception message from hornetq code

Posted by an...@apache.org.
back-porting fix on exception message from hornetq code

This is such a small change that qualifies as a tweak, so I'm not bothering on creating a JIRA for this


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/bace9213
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/bace9213
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/bace9213

Branch: refs/heads/master
Commit: bace921389150c1654f26977282df82259cc4339
Parents: 60179cf
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 11 21:04:18 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 21:04:21 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/activemq/jms/client/ActiveMQSession.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/bace9213/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
index 7725ba8..a170b9e 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
@@ -767,7 +767,7 @@ public class ActiveMQSession implements QueueSession, TopicSession
             if (subscriptionName == null)
             {
                if (durability != ConsumerDurability.NON_DURABLE)
-                  throw new RuntimeException();
+                  throw new RuntimeException("Subscription name cannot be null for durable topic consumer");
                // Non durable sub
 
                queueName = new SimpleString(UUID.randomUUID().toString());
@@ -782,7 +782,7 @@ public class ActiveMQSession implements QueueSession, TopicSession
             {
                // Durable sub
                if (durability != ConsumerDurability.DURABLE)
-                  throw new RuntimeException();
+                  throw new RuntimeException("Subscription name must be null for non-durable topic consumer");
                if (connection.getClientID() == null)
                {
                   throw new IllegalStateException("Cannot create durable subscription - client ID has not been set");