You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2015/02/12 16:14:47 UTC
[1/5] activemq-6 git commit: ACTIVEMQ6-78 Adding tests to evaluate
this task
Repository: activemq-6
Updated Branches:
refs/heads/master 1d5a7a10d -> b8db8b051
ACTIVEMQ6-78 Adding tests to evaluate this task
https://issues.apache.org/jira/browse/ACTIVEMQ6-78
This commit is just adding tests I used to debug the blocked calls issue
There are some profiling parameters you can use that I added as a comment to the pom
The reason this is a separate commit is that it would be easier to validate the results of optimizations while
checking after and before any changes
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/41b28f4b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/41b28f4b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/41b28f4b
Branch: refs/heads/master
Commit: 41b28f4b23cb42bb168a7b1dfb0fb6e5d6faa053
Parents: 1d5a7a1
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 10 10:53:02 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 12:47:01 2015 -0500
----------------------------------------------------------------------
pom.xml | 7 +-
.../sends/AbstractSendReceivePerfTest.java | 247 +++++++++++++++++++
.../tests/performance/sends/ClientACKPerf.java | 130 ++++++++++
.../sends/MeasureCommitPerfTest.java | 81 ++++++
.../tests/performance/sends/PreACKPerf.java | 93 +++++++
5 files changed, 557 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c3ae18a..4172145 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,7 +79,12 @@
see https://intellij-support.jetbrains.com/entries/23395793
Also see: http://youtrack.jetbrains.com/issue/IDEA-125696
- -->
+
+
+ For profiling add this line and use jmc (Java Mission Control) to evaluate the results:
+ -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=delay=30s,duration=120s,filename=/tmp/myrecording.jfr
+
+ -->
<activemq-surefire-argline>-Djava.util.logging.manager=org.jboss.logmanager.LogManager
-Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
new file mode 100644
index 0000000..c8b2d7f
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.apache.activemq.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Client-ack time
+ *
+ * @author Clebert Suconic
+ */
+public abstract class AbstractSendReceivePerfTest extends JMSTestBase
+{
+ protected static final String Q_NAME = "test-queue-01";
+ private Queue queue;
+
+ protected AtomicBoolean running = new AtomicBoolean(true);
+
+
+ @Override
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ jmsServer.createQueue(false, Q_NAME, null, true, Q_NAME);
+ queue = ActiveMQJMSClient.createQueue(Q_NAME);
+
+ AddressSettings settings = new AddressSettings();
+ settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ settings.setMaxSizeBytes(Long.MAX_VALUE);
+ server.getAddressSettingsRepository().clear();
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ }
+
+
+ @Override
+ protected void registerConnectionFactory() throws Exception
+ {
+ List<TransportConfiguration> connectorConfigs = new ArrayList<TransportConfiguration>();
+ connectorConfigs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+
+ createCF(connectorConfigs, "/cf");
+
+ cf = (ConnectionFactory) namingContext.lookup("/cf");
+ }
+
+
+ private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(AbstractSendReceivePerfTest.class.getName());
+
+
+ @Test
+ public void testSendReceive() throws Exception
+ {
+ long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000);
+
+
+ MessageReceiver receiver = new MessageReceiver(Q_NAME, numberOfSamples);
+ receiver.start();
+ MessageSender sender = new MessageSender(Q_NAME);
+ sender.start();
+
+ receiver.join();
+ sender.join();
+
+ assertFalse(receiver.failed);
+ assertFalse(sender.failed);
+
+ }
+
+ final Semaphore pendingCredit = new Semaphore(5000);
+
+ /**
+ * to be called after a message is consumed
+ * so the flow control of the test kicks in.
+ */
+ protected final void afterConsume(Message message)
+ {
+ if (message != null)
+ {
+ pendingCredit.release();
+ }
+ }
+
+
+ protected final void beforeSend()
+ {
+ while (running.get())
+ {
+ try
+ {
+ if (pendingCredit.tryAcquire(1, TimeUnit.SECONDS))
+ {
+ return;
+ }
+ else
+ {
+ System.out.println("Couldn't get credits!");
+ }
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+
+
+
+
+ private class MessageReceiver extends Thread
+ {
+ private final String qName;
+ private final long numberOfSamples;
+
+ public boolean failed = false;
+
+ public MessageReceiver(String qname, long numberOfSamples) throws Exception
+ {
+ super("Receiver " + qname);
+ this.qName = qname;
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ LOGGER.info("Receiver: Connecting");
+ Connection c = cf.createConnection();
+
+ consumeMessages(c, qName);
+
+ c.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ failed = true;
+ }
+ finally
+ {
+ running.set(false);
+ }
+ }
+ }
+
+ protected abstract void consumeMessages(Connection c, String qName) throws Exception;
+
+ private class MessageSender extends Thread
+ {
+ protected String qName;
+
+ public boolean failed = false;
+
+ public MessageSender(String qname) throws Exception
+ {
+ super("Sender " + qname);
+
+ this.qName = qname;
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ LOGGER.info("Sender: Connecting");
+ Connection c = cf.createConnection();
+
+ sendMessages(c, qName);
+
+ c.close();
+
+ }
+ catch (Exception e)
+ {
+ failed = true;
+ if (e instanceof InterruptedException)
+ {
+ LOGGER.info("Sender done.");
+ }
+ else
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ /* This will by default send non persistent messages */
+ protected void sendMessages(Connection c, String qName) throws JMSException
+ {
+ Session s = null;
+ s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ LOGGER.info("Sender: Using AUTO-ACK session");
+
+
+ Queue q = s.createQueue(qName);
+ MessageProducer producer = s.createProducer(null);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+
+ long sent = 0;
+ while (running.get())
+ {
+ beforeSend();
+ producer.send(q, s.createTextMessage("Message_" + (sent++)));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
new file mode 100644
index 0000000..6dba4f4
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * @author clebertsuconic
+ */
+@RunWith(Parameterized.class)
+public class ClientACKPerf extends AbstractSendReceivePerfTest
+{
+
+ @Parameterized.Parameters(name = "batchSize={0}")
+ public static Collection<Object[]> data()
+ {
+ List<Object[]> list = Arrays.asList(new Object[][]{
+ {1},
+ {2000}});
+
+ System.out.println("Size = " + list.size());
+ return list;
+ }
+
+ public ClientACKPerf(int batchSize)
+ {
+ super();
+ this.batchSize = batchSize;
+ }
+
+
+ public final int batchSize;
+
+ @Override
+ protected void consumeMessages(Connection c, String qName) throws Exception
+ {
+ int mode = 0;
+ mode = Session.CLIENT_ACKNOWLEDGE;
+
+ System.out.println("Receiver: Using PRE-ACK mode");
+
+ Session s = c.createSession(false, mode);
+ Queue q = s.createQueue(qName);
+ MessageConsumer consumer = s.createConsumer(q, null, false);
+
+ c.start();
+
+ Message m = null;
+
+ long totalTimeACKTime = 0;
+
+
+ long start = System.currentTimeMillis();
+
+ long nmessages = 0;
+ long timeout = System.currentTimeMillis() + 60 * 1000;
+ while (timeout > System.currentTimeMillis())
+ {
+ m = consumer.receive(5000);
+ afterConsume(m);
+
+
+ if (m == null)
+ {
+ throw new Exception("Failed with m = null");
+ }
+
+ if (nmessages++ % batchSize == 0)
+ {
+ long startACK = System.nanoTime();
+ m.acknowledge();
+ long endACK = System.nanoTime();
+ totalTimeACKTime += (endACK - startACK);
+ }
+
+
+ if (nmessages % 10000 == 0)
+ {
+ printMsgsSec(start, nmessages, totalTimeACKTime);
+ }
+ }
+
+
+ printMsgsSec(start, nmessages, totalTimeACKTime);
+ }
+
+
+
+ protected void printMsgsSec(final long start, final double nmessages, final double totalTimeACKTime)
+ {
+
+ long end = System.currentTimeMillis();
+ double elapsed = ((double) end - (double) start) / 1000f;
+
+ double messagesPerSecond = nmessages / elapsed;
+ double nAcks = nmessages / batchSize;
+
+ System.out.println("batchSize=" + batchSize + ", numberOfMessages="
+ + nmessages + ", elapsedTime=" + elapsed + " msgs/sec= " + messagesPerSecond + ",totalTimeAcking=" + String.format("%10.4f", totalTimeACKTime) +
+ ", avgACKTime=" + String.format("%10.4f", (totalTimeACKTime / nAcks)));
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
new file mode 100644
index 0000000..0528d0e
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+
+/**
+ * @author clebertsuconic
+ */
+
+public class MeasureCommitPerfTest extends AbstractSendReceivePerfTest
+{
+ @Override
+ protected void consumeMessages(Connection c, String qName) throws Exception
+ {
+ }
+
+
+ /* This will by default send non persistent messages */
+ protected void sendMessages(Connection c, String qName) throws JMSException
+ {
+ Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+
+
+ long timeout = System.currentTimeMillis() + 30 * 1000;
+
+ long startMeasure = System.currentTimeMillis() + 5000;
+ long start = 0;
+ long committs = 0;
+ while (timeout > System.currentTimeMillis())
+ {
+
+ if (start == 0 && System.currentTimeMillis() > startMeasure)
+ {
+ System.out.println("heat up");
+ start = System.currentTimeMillis();
+ committs = 0;
+ }
+
+ s.commit();
+ committs++;
+ if (start > 0 && committs % 1000 == 0) printCommitsSecond(start, committs);
+ }
+ printCommitsSecond(start, committs);
+
+ s.close();
+ }
+
+
+ protected void printCommitsSecond(final long start, final double committs)
+ {
+
+ long end = System.currentTimeMillis();
+ double elapsed = ((double) end - (double) start) / 1000f;
+
+ double commitsPerSecond = committs / elapsed;
+
+ System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
+ + committs + ", elapsed=" + elapsed + " msgs/sec= " + commitsPerSecond);
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/41b28f4b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
new file mode 100644
index 0000000..a6d2906
--- /dev/null
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.performance.sends;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.api.jms.ActiveMQJMSConstants;
+
+/**
+ * @author clebertsuconic
+ */
+
+public class PreACKPerf extends AbstractSendReceivePerfTest
+{
+ @Override
+ protected void consumeMessages(Connection c, String qName) throws Exception
+ {
+ int mode = 0;
+ mode = ActiveMQJMSConstants.PRE_ACKNOWLEDGE;
+
+ System.out.println("Receiver: Using PRE-ACK mode");
+
+ Session s = c.createSession(false, mode);
+ Queue q = s.createQueue(qName);
+ MessageConsumer consumer = s.createConsumer(q, null, false);
+
+ c.start();
+
+ Message m = null;
+
+
+ long start = System.currentTimeMillis();
+
+ long nmessages = 0;
+ long timeout = System.currentTimeMillis() + 30 * 1000;
+ while (timeout > System.currentTimeMillis())
+ {
+ m = consumer.receive(5000);
+
+ nmessages++;
+
+ if (m == null)
+ {
+ throw new Exception("Failed with m = null");
+ }
+
+ if (nmessages % 10000 == 0)
+ {
+ printMsgsSec(start, nmessages);
+ }
+
+ }
+
+ long end = System.currentTimeMillis();
+
+ printMsgsSec(start, nmessages);
+ }
+
+
+
+ protected void printMsgsSec(final long start, final double nmessages)
+ {
+
+ long end = System.currentTimeMillis();
+ double elapsed = ((double) end - (double) start) / 1000f;
+
+ double messagesPerSecond = nmessages / elapsed;
+
+ System.out.println("end = " + end + ", start=" + start + ", numberOfMessages="
+ + nmessages + ", elapsed=" + elapsed + " msgs/sec= " + messagesPerSecond);
+
+ }
+
+
+}
[3/5] activemq-6 git commit: ACTIVEMQ6-78 Improving performance over
Netty NIO blocked calls
Posted by an...@apache.org.
ACTIVEMQ6-78 Improving performance over Netty NIO blocked calls
https://issues.apache.org/jira/browse/ACTIVEMQ6-78 performance work
There are two aspects of this work. First avoid asynchronous packets and avoid
context switch over the executors. Packet had a method to make certain packets such
as commit to use a different executor. Since it's NIO everything is done at the Netty thread now.
The second aspect was to make sure we use the proper buffering
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/f7c4d56c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/f7c4d56c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/f7c4d56c
Branch: refs/heads/master
Commit: f7c4d56cc771e2d4ff54fd2796ad2cdfae9f5e13
Parents: 41b28f4
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Feb 10 11:30:18 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 12:47:14 2015 -0500
----------------------------------------------------------------------
.../core/buffers/impl/ChannelBufferWrapper.java | 12 +++++++
.../impl/ResetLimitWrappedActiveMQBuffer.java | 4 ++-
.../activemq/core/protocol/core/Packet.java | 2 --
.../impl/ActiveMQClientProtocolManager.java | 2 +-
.../core/protocol/core/impl/PacketImpl.java | 7 +---
.../core/impl/RemotingConnectionImpl.java | 37 ++------------------
.../core/impl/wireformat/RollbackMessage.java | 6 ----
.../impl/wireformat/SessionCloseMessage.java | 6 ----
.../impl/wireformat/SessionCommitMessage.java | 5 ---
.../impl/wireformat/SessionXACommitMessage.java | 6 ----
.../wireformat/SessionXAPrepareMessage.java | 6 ----
.../wireformat/SessionXARollbackMessage.java | 6 ----
.../remoting/impl/netty/NettyConnection.java | 6 ++--
.../remoting/impl/netty/NettyConnector.java | 3 +-
.../protocol/AbstractRemotingConnection.java | 4 +--
.../spi/core/protocol/RemotingConnection.java | 3 +-
.../activemq/spi/core/remoting/Connection.java | 2 +-
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../core/protocol/stomp/StompConnection.java | 2 +-
.../core/remoting/impl/invm/InVMConnection.java | 2 +-
.../impl/netty/NettyServerConnection.java | 2 +-
.../jms/tests/message/MessageHeaderTest.java | 4 +--
.../impl/netty/NettyConnectionTest.java | 2 +-
23 files changed, 35 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
----------------------------------------------------------------------
diff --git a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
index a3fa5b5..53d7306 100644
--- a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
+++ b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java
@@ -35,6 +35,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer
protected ByteBuf buffer; // NO_UCD (use final)
private final boolean releasable;
+ public static ByteBuf unwrap(ByteBuf buffer)
+ {
+ ByteBuf parent;
+ while ((parent = buffer.unwrap()) != null &&
+ parent != buffer) // this last part is just in case the semantic
+ { // ever changes where unwrap is returning itself
+ buffer = parent;
+ }
+
+ return buffer;
+ }
+
public ChannelBufferWrapper(final ByteBuf buffer)
{
this(buffer, false);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
index 96b3e2b..1cd342d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java
@@ -46,7 +46,9 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper
public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message)
{
- super(buffer.byteBuf());
+ // a wrapped inside a wrapper will increase the stack size.
+ // we fixed this here due to some profiling testing
+ super(unwrap(buffer.byteBuf()));
this.limit = limit;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
index 4027c67..6b23bff 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java
@@ -85,6 +85,4 @@ public interface Packet
* @return true if confirmation is required
*/
boolean isRequiresConfirmations();
-
- boolean isAsyncExec();
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index b7366df..890e8d0 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -482,7 +482,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager
{
// no need to send handshake on inVM as inVM is not using the NettyProtocolHandling
String handshake = "HORNETQ";
- ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length());
+ ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length());
amqbuffer.writeBytes(handshake.getBytes());
transportConnection.write(amqbuffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
index e2bdc28..61e0eca 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java
@@ -276,7 +276,7 @@ public class PacketImpl implements Packet
public ActiveMQBuffer encode(final RemotingConnection connection)
{
- ActiveMQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE);
+ ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE);
// The standard header fields
@@ -333,11 +333,6 @@ public class PacketImpl implements Packet
return true;
}
- public boolean isAsyncExec()
- {
- return false;
- }
-
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
index 5ef4cfd..8820850 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -81,8 +81,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private final Object failLock = new Object();
- private volatile boolean executing;
-
private final SimpleString nodeID;
private String clientID;
@@ -381,39 +379,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
}
- if (packet.isAsyncExec() && executor != null)
- {
- executing = true;
-
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- doBufferReceived(packet);
- }
- catch (Throwable t)
- {
- ActiveMQClientLogger.LOGGER.errorHandlingPacket(t, packet);
- }
-
- executing = false;
- }
- });
- }
- else
- {
- //To prevent out of order execution if interleaving sync and async operations on same connection
- while (executing)
- {
- Thread.yield();
- }
-
- // Pings must always be handled out of band so we can send pings back to the client quickly
- // otherwise they would get in the queue with everything else which might give an intolerable delay
- doBufferReceived(packet);
- }
+ dataReceived = true;
+ doBufferReceived(packet);
super.bufferReceived(connectionID, buffer);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
index 41c5735..340c73a 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java
@@ -70,12 +70,6 @@ public class RollbackMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public int hashCode()
{
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
index 1c8a276..dc61860 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java
@@ -49,10 +49,4 @@ public class SessionCloseMessage extends PacketImpl
// TODO
return 0;
}
-
- @Override
- public boolean isAsyncExec()
- {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
index c7242fb..1b1e081 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java
@@ -30,9 +30,4 @@ public class SessionCommitMessage extends PacketImpl
super(SESS_COMMIT);
}
- @Override
- public boolean isAsyncExec()
- {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
index 65fdf33..14668cb 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java
@@ -56,12 +56,6 @@ public class SessionXACommitMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public void encodeRest(final ActiveMQBuffer buffer)
{
XidCodecSupport.encodeXid(xid, buffer);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
index b9a531a..8ff5b15 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java
@@ -62,12 +62,6 @@ public class SessionXAPrepareMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public int hashCode()
{
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
index 8efab01..272386d 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java
@@ -63,12 +63,6 @@ public class SessionXARollbackMessage extends PacketImpl
}
@Override
- public boolean isAsyncExec()
- {
- return true;
- }
-
- @Override
public int hashCode()
{
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
index a73aa1b..c7eafe7 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java
@@ -172,9 +172,9 @@ public class NettyConnection implements Connection
listener.connectionDestroyed(getID());
}
- public ActiveMQBuffer createBuffer(final int size)
+ public ActiveMQBuffer createTransportBuffer(final int size)
{
- return new ChannelBufferWrapper(channel.alloc().buffer(size));
+ return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true);
}
public Object getID()
@@ -199,7 +199,7 @@ public class NettyConnection implements Connection
{
channel.writeAndFlush(batchBuffer.byteBuf());
- batchBuffer = createBuffer(BATCHING_BUFFER_SIZE);
+ batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
}
}
finally
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
index 3ed9e87..8425edd 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java
@@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
@@ -477,7 +476,7 @@ public class NettyConnector extends AbstractConnector
}
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
- bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);
final SSLContext context;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
index a48845f..22b26ee 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java
@@ -182,9 +182,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection
closeListeners.addAll(listeners);
}
- public ActiveMQBuffer createBuffer(final int size)
+ public ActiveMQBuffer createTransportBuffer(final int size)
{
- return transportConnection.createBuffer(size);
+ return transportConnection.createTransportBuffer(size);
}
public Connection getTransportConnection()
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
index 186e098..9eb287e 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java
@@ -115,11 +115,12 @@ public interface RemotingConnection extends BufferHandler
/**
* creates a new ActiveMQBuffer of the specified size.
+ * For the purpose of i/o outgoing packets
*
* @param size the size of buffer required
* @return the buffer
*/
- ActiveMQBuffer createBuffer(int size);
+ ActiveMQBuffer createTransportBuffer(int size);
/**
* called when the underlying connection fails.
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
index a4896e2..b6060d3 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java
@@ -36,7 +36,7 @@ public interface Connection
* @param size the size of buffer to create
* @return the new buffer.
*/
- ActiveMQBuffer createBuffer(int size);
+ ActiveMQBuffer createTransportBuffer(int size);
RemotingConnection getProtocolConnection();
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
index 20762e2..bf906bc 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
@@ -440,7 +440,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
}
@Override
- public ActiveMQBuffer createBuffer(int size)
+ public ActiveMQBuffer createTransportBuffer(int size)
{
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
index 16cf55e..9a4e7b7 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
@@ -280,7 +280,7 @@ public final class StompConnection implements RemotingConnection
}
@Override
- public ActiveMQBuffer createBuffer(int size)
+ public ActiveMQBuffer createTransportBuffer(int size)
{
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
index eed1ff4..37e2acb 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java
@@ -142,7 +142,7 @@ public class InVMConnection implements Connection
}
}
- public ActiveMQBuffer createBuffer(final int size)
+ public ActiveMQBuffer createTransportBuffer(final int size)
{
return ActiveMQBuffers.dynamicBuffer(size);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
index d899e85..339b407 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java
@@ -34,7 +34,7 @@ public class NettyServerConnection extends NettyConnection
}
@Override
- public ActiveMQBuffer createBuffer(int size)
+ public ActiveMQBuffer createTransportBuffer(int size)
{
return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
index 784a0c0..cfbf995 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java
@@ -1391,7 +1391,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
}
/* (non-Javadoc)
- * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(byte[])
+ * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(byte[])
*/
public ActiveMQBuffer createBuffer(final byte[] bytes)
{
@@ -1400,7 +1400,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase
}
/* (non-Javadoc)
- * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(int)
+ * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(int)
*/
public ActiveMQBuffer createBuffer(final int size)
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/f7c4d56c/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
index cc9d57b..b0a4c14 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
@@ -74,7 +74,7 @@ public class NettyConnectionTest extends UnitTestCase
final int size = 1234;
- ActiveMQBuffer buff = conn.createBuffer(size);
+ ActiveMQBuffer buff = conn.createTransportBuffer(size);
buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization.
Assert.assertEquals(size, buff.capacity());
[2/5] activemq-6 git commit: Fixing test failure
Posted by an...@apache.org.
Fixing test failure
This test started to fail after performance improvements from ACTIVEMQ6-78
After some investigation it turned out that this test was racing with the client
crashing even before the queue was created or sending a non persistent message
asynchronously while the client crashed before the server received the message.
I've also decreased some of the times on pings so the test could run a bit faster
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/60179cfc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/60179cfc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/60179cfc
Branch: refs/heads/master
Commit: 60179cfc980fba4756d588c3a9d8de44be3e9960
Parents: f7c4d56
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 11 08:36:17 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 12:47:14 2015 -0500
----------------------------------------------------------------------
.../integration/clientcrash/ClientCrashTest.java | 18 ++++++++++++------
.../integration/clientcrash/CrashClient.java | 3 ++-
.../integration/clientcrash/CrashClient2.java | 2 +-
3 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/60179cfc/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
index fbab2f4..25177bb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java
@@ -44,9 +44,11 @@ import org.apache.activemq.tests.util.SpawnedVMSupport;
*/
public class ClientCrashTest extends ClientTestBase
{
- static final int PING_PERIOD = 2000;
+ // using short values so this test can run fast
+ static final int PING_PERIOD = 100;
- static final int CONNECTION_TTL = 6000;
+ // using short values so this test can run fast
+ static final int CONNECTION_TTL = 1000;
// Constants -----------------------------------------------------
@@ -76,18 +78,22 @@ public class ClientCrashTest extends ClientTestBase
{
assertActiveConnections(1);
+ ClientSession session = sf.createSession(false, true, true);
+ session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
+
// spawn a JVM that creates a Core client, which sends a message
+ // It has to be spawned after the queue was created.
+ // if the client is too fast you race the send before the queue was created, missing a message
Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
- ClientSession session = sf.createSession(false, true, true);
- session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false);
ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE);
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
+
session.start();
// receive a message from the queue
- Message messageFromClient = consumer.receive(500000);
+ Message messageFromClient = consumer.receive(5000);
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
@@ -155,7 +161,7 @@ public class ClientCrashTest extends ClientTestBase
session.start();
// receive a message from the queue
- ClientMessage messageFromClient = consumer.receive(10000);
+ ClientMessage messageFromClient = consumer.receive(timeout);
Assert.assertNotNull("no message received", messageFromClient);
Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString());
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/60179cfc/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
index 8a0c5d6..406ba56 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java
@@ -59,8 +59,9 @@ public class CrashClient
ClientSession session = sf.createSession(false, true, true);
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE);
+ // it has to be durable otherwise it may race dying before the client is killed
ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE,
- false,
+ true,
0,
System.currentTimeMillis(),
(byte)1);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/60179cfc/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
index 15a1c07..775b50d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java
@@ -59,7 +59,7 @@ public class CrashClient2
ClientSession session = sf.createSession(true, true, 1000000);
ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE2);
- ClientMessage message = session.createMessage(false);
+ ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT);
producer.send(message);
[5/5] activemq-6 git commit: merging #95 - performance improvements
Posted by an...@apache.org.
merging #95 - performance improvements
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/b8db8b05
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/b8db8b05
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/b8db8b05
Branch: refs/heads/master
Commit: b8db8b05145b92dc42695436267f0285a025a559
Parents: 1d5a7a1 bace921
Author: Andy Taylor <an...@gmail.com>
Authored: Thu Feb 12 15:12:35 2015 +0000
Committer: Andy Taylor <an...@gmail.com>
Committed: Thu Feb 12 15:12:35 2015 +0000
----------------------------------------------------------------------
.../core/buffers/impl/ChannelBufferWrapper.java | 12 +
.../impl/ResetLimitWrappedActiveMQBuffer.java | 4 +-
.../activemq/core/protocol/core/Packet.java | 2 -
.../impl/ActiveMQClientProtocolManager.java | 2 +-
.../core/protocol/core/impl/PacketImpl.java | 7 +-
.../core/impl/RemotingConnectionImpl.java | 37 +--
.../core/impl/wireformat/RollbackMessage.java | 6 -
.../impl/wireformat/SessionCloseMessage.java | 6 -
.../impl/wireformat/SessionCommitMessage.java | 5 -
.../impl/wireformat/SessionXACommitMessage.java | 6 -
.../wireformat/SessionXAPrepareMessage.java | 6 -
.../wireformat/SessionXARollbackMessage.java | 6 -
.../remoting/impl/netty/NettyConnection.java | 6 +-
.../remoting/impl/netty/NettyConnector.java | 3 +-
.../protocol/AbstractRemotingConnection.java | 4 +-
.../spi/core/protocol/RemotingConnection.java | 3 +-
.../activemq/spi/core/remoting/Connection.java | 2 +-
.../activemq/jms/client/ActiveMQSession.java | 4 +-
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../core/protocol/stomp/StompConnection.java | 2 +-
.../core/remoting/impl/invm/InVMConnection.java | 2 +-
.../impl/netty/NettyServerConnection.java | 2 +-
pom.xml | 7 +-
.../clientcrash/ClientCrashTest.java | 18 +-
.../integration/clientcrash/CrashClient.java | 3 +-
.../integration/clientcrash/CrashClient2.java | 2 +-
.../jms/tests/message/MessageHeaderTest.java | 4 +-
.../sends/AbstractSendReceivePerfTest.java | 247 +++++++++++++++++++
.../tests/performance/sends/ClientACKPerf.java | 130 ++++++++++
.../sends/MeasureCommitPerfTest.java | 81 ++++++
.../tests/performance/sends/PreACKPerf.java | 93 +++++++
.../impl/netty/NettyConnectionTest.java | 2 +-
32 files changed, 609 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
[4/5] activemq-6 git commit: back-porting fix on exception message
from hornetq code
Posted by an...@apache.org.
back-porting fix on exception message from hornetq code
This is such a small change that qualifies as a tweak, so I'm not bothering on creating a JIRA for this
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/bace9213
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/bace9213
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/bace9213
Branch: refs/heads/master
Commit: bace921389150c1654f26977282df82259cc4339
Parents: 60179cf
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Feb 11 21:04:18 2015 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Feb 11 21:04:21 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/activemq/jms/client/ActiveMQSession.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/bace9213/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
index 7725ba8..a170b9e 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java
@@ -767,7 +767,7 @@ public class ActiveMQSession implements QueueSession, TopicSession
if (subscriptionName == null)
{
if (durability != ConsumerDurability.NON_DURABLE)
- throw new RuntimeException();
+ throw new RuntimeException("Subscription name cannot be null for durable topic consumer");
// Non durable sub
queueName = new SimpleString(UUID.randomUUID().toString());
@@ -782,7 +782,7 @@ public class ActiveMQSession implements QueueSession, TopicSession
{
// Durable sub
if (durability != ConsumerDurability.DURABLE)
- throw new RuntimeException();
+ throw new RuntimeException("Subscription name must be null for non-durable topic consumer");
if (connection.getClientID() == null)
{
throw new IllegalStateException("Cannot create durable subscription - client ID has not been set");