You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/08/08 14:20:17 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1308: Make acknowlegde in
AcitveMQMessage non blocking
ARTEMIS-1308: Make acknowlegde in AcitveMQMessage non blocking
Allow commit within the acknowledge to be non blocking (batch) this toggles the on the existing blockonacknowlegde config.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7b40abea
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7b40abea
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7b40abea
Branch: refs/heads/master
Commit: 7b40abead95b36e5769769373d6f7bab8e34dde9
Parents: 88f78d9
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Fri Jul 28 14:27:29 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Aug 8 10:07:15 2017 -0400
----------------------------------------------------------------------
.../artemis/api/core/client/ClientSession.java | 10 ++-
.../core/client/impl/ClientSessionImpl.java | 10 ++-
.../core/impl/ActiveMQSessionContext.java | 9 +++
.../spi/core/remoting/SessionContext.java | 3 +
.../artemis/jms/client/ActiveMQConnection.java | 7 +-
.../artemis/jms/client/ActiveMQMessage.java | 15 ++++-
.../jms/client/ActiveMQMessageConsumer.java | 3 +
.../jms/client/JMSMessageListenerWrapper.java | 9 ++-
.../jms/consumer/JmsConsumerTest.java | 9 ++-
.../artemis/jms/tests/AcknowledgementTest.java | 69 ++++++++++++++++++++
.../jms/tests/message/MessageHeaderTest.java | 4 ++
11 files changed, 137 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index c3d6749..ab59eb6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -894,13 +894,21 @@ public interface ClientSession extends XAResource, AutoCloseable {
boolean isXA();
/**
- * Commits the current transaction.
+ * Commits the current transaction, blocking.
*
* @throws ActiveMQException if an exception occurs while committing the transaction
*/
void commit() throws ActiveMQException;
/**
+ * Commits the current transaction.
+ *
+ * @param block if the commit will be blocking or not.
+ * @throws ActiveMQException if an exception occurs while committing the transaction
+ */
+ void commit(boolean block) throws ActiveMQException;
+
+ /**
* Rolls back the current transaction.
*
* @throws ActiveMQException if an exception occurs while rolling back the transaction
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 5f6b40b..ef4e87c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -761,6 +761,11 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
@Override
public void commit() throws ActiveMQException {
+ commit(true);
+ }
+
+ @Override
+ public void commit(boolean block) throws ActiveMQException {
checkClosed();
if (logger.isTraceEnabled()) {
@@ -782,8 +787,9 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
if (rollbackOnly) {
rollbackOnFailover(true);
}
+ startCall();
try {
- sessionContext.simpleCommit();
+ sessionContext.simpleCommit(block);
} catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) {
// The call to commit was unlocked on failover, we therefore rollback the tx,
@@ -794,6 +800,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} else {
throw e;
}
+ } finally {
+ endCall();
}
//oops, we have failed over during the commit and don't know what happened
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index fc43672..d0d75ac 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -349,6 +349,15 @@ public class ActiveMQSessionContext extends SessionContext {
}
@Override
+ public void simpleCommit(boolean block) throws ActiveMQException {
+ if (block) {
+ sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
+ } else {
+ sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
+ }
+ }
+
+ @Override
public void simpleRollback(boolean lastMessageAsDelivered) throws ActiveMQException {
sessionChannel.sendBlocking(new RollbackMessage(lastMessageAsDelivered), PacketImpl.NULL_RESPONSE);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index b123960..78135a8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -214,6 +214,9 @@ public abstract class SessionContext {
public abstract void simpleCommit() throws ActiveMQException;
+ public abstract void simpleCommit(boolean block) throws ActiveMQException;
+
+
/**
* If we are doing a simple rollback on the RA, we need to ack the last message sent to the consumer,
* otherwise DLQ won't work.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
index 6432af2..bf0d236 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQConnection.java
@@ -597,7 +597,8 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
try {
ClientSession session;
-
+ boolean isBlockOnAcknowledge = sessionFactory.getServerLocator().isBlockOnAcknowledge();
+ int ackBatchSize = sessionFactory.getServerLocator().getAckBatchSize();
if (acknowledgeMode == Session.SESSION_TRANSACTED) {
session = sessionFactory.createSession(username, password, isXA, false, false, sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize);
} else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE) {
@@ -605,9 +606,9 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
} else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE) {
session = sessionFactory.createSession(username, password, isXA, true, true, sessionFactory.getServerLocator().isPreAcknowledge(), dupsOKBatchSize);
} else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
- session = sessionFactory.createSession(username, password, isXA, true, false, sessionFactory.getServerLocator().isPreAcknowledge(), transactionBatchSize);
+ session = sessionFactory.createSession(username, password, isXA, true, false, sessionFactory.getServerLocator().isPreAcknowledge(), isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize);
} else if (acknowledgeMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
- session = sessionFactory.createSession(username, password, isXA, true, false, false, transactionBatchSize);
+ session = sessionFactory.createSession(username, password, isXA, true, false, false, isBlockOnAcknowledge ? transactionBatchSize : ackBatchSize);
} else if (acknowledgeMode == ActiveMQJMSConstants.PRE_ACKNOWLEDGE) {
session = sessionFactory.createSession(username, password, isXA, true, false, true, transactionBatchSize);
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index f13f602..928d375 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.reader.MessageUtil;
@@ -200,6 +201,8 @@ public class ActiveMQMessage implements javax.jms.Message {
private boolean individualAck;
+ private boolean clientAck;
+
private long jmsDeliveryTime;
// Constructors --------------------------------------------------
@@ -710,11 +713,15 @@ public class ActiveMQMessage implements javax.jms.Message {
public void acknowledge() throws JMSException {
if (session != null) {
try {
+ if (session.isClosed()) {
+ throw ActiveMQClientMessageBundle.BUNDLE.sessionClosed();
+ }
if (individualAck) {
message.individualAcknowledge();
}
-
- session.commit();
+ if (clientAck || individualAck) {
+ session.commit(session.isBlockOnAcknowledge());
+ }
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
@@ -777,6 +784,10 @@ public class ActiveMQMessage implements javax.jms.Message {
this.individualAck = true;
}
+ public void setClientAcknowledge() {
+ this.clientAck = true;
+ }
+
public void resetMessageID(final String newMsgID) {
this.msgID = newMsgID;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 3d7fa56..4664bb9 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -237,6 +237,9 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
// https://issues.jboss.org/browse/JBPAPP-6110
if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {
jmsMsg.setIndividualAcknowledge();
+ } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
+ jmsMsg.setClientAcknowledge();
+ coreMessage.acknowledge();
} else {
coreMessage.acknowledge();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index 92ae226..5d9f6ed 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -41,6 +41,8 @@ public class JMSMessageListenerWrapper implements MessageHandler {
private final boolean individualACK;
+ private final boolean clientACK;
+
protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options,
final ActiveMQConnection connection,
final ActiveMQSession session,
@@ -60,6 +62,8 @@ public class JMSMessageListenerWrapper implements MessageHandler {
transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();
individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
+
+ clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE);
}
/**
@@ -74,11 +78,14 @@ public class JMSMessageListenerWrapper implements MessageHandler {
msg.setIndividualAcknowledge();
}
+ if (clientACK) {
+ msg.setClientAcknowledge();
+ }
+
try {
msg.doBeforeReceive();
} catch (Exception e) {
ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);
-
return;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
index d242da8..5cefbd0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/consumer/JmsConsumerTest.java
@@ -179,9 +179,10 @@ public class JmsConsumerTest extends JMSTestBase {
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+ conn.close();
+
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- conn.close();
}
@Test
@@ -225,9 +226,10 @@ public class JmsConsumerTest extends JMSTestBase {
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+ context.close();
+
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- context.close();
}
@Test
@@ -299,9 +301,10 @@ public class JmsConsumerTest extends JMSTestBase {
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
+ conn.close();
+
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
- conn.close();
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
index 17927b1..a9ede4b 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/AcknowledgementTest.java
@@ -19,6 +19,8 @@ package org.apache.activemq.artemis.jms.tests;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -31,6 +33,9 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import java.util.concurrent.CountDownLatch;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.jms.JMSFactoryType;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.junit.Assert;
import org.junit.Test;
@@ -1297,4 +1302,68 @@ public class AcknowledgementTest extends JMSTestCase {
checkEmpty(queue1);
}
+
+ /**
+ * Ensure no blocking calls in acknowledge flow when block on acknowledge = false.
+ * This is done by checking the performance compared to blocking is much improved.
+ */
+ @Test
+ public void testNonBlockingAckPerf() throws Exception {
+ getJmsServerManager().createConnectionFactory("testsuitecf1", false, JMSFactoryType.CF, NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, Ac
tiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf1");
+ getJmsServerManager().createConnectionFactory("testsuitecf2", false, JMSFactoryType.CF, NETTY_CONNECTOR, null, ActiveMQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD, ActiveMQClient.DEFAULT_CONNECTION_TTL, ActiveMQClient.DEFAULT_CALL_TIMEOUT, ActiveMQClient.DEFAULT_CALL_FAILOVER_TIMEOUT, ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES, ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE, ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE, ActiveMQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_WINDOW_SIZE, ActiveMQClient.DEFAULT_PRODUCER_MAX_RATE, true, true, true, ActiveMQClient.DEFAULT_AUTO_GROUP, ActiveMQClient.DEFAULT_PRE_ACKNOWLEDGE, ActiveMQClient.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE, ActiveMQClient.DEFAULT_USE_GLOBAL_POOLS, ActiveMQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE, Ac
tiveMQClient.DEFAULT_THREAD_POOL_MAX_SIZE, ActiveMQClient.DEFAULT_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RETRY_INTERVAL_MULTIPLIER, ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL, ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS, ActiveMQClient.DEFAULT_FAILOVER_ON_INITIAL_CONNECTION, null, "/testsuitecf2");
+
+ ActiveMQJMSConnectionFactory cf1 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf1");
+ cf1.setBlockOnAcknowledge(false);
+ ActiveMQJMSConnectionFactory cf2 = (ActiveMQJMSConnectionFactory) getInitialContext().lookup("/testsuitecf2");
+ cf2.setBlockOnAcknowledge(true);
+
+ int messageCount = 10000;
+
+ long sendT1 = send(cf1, queue1, messageCount);
+ long sendT2 = send(cf2, queue2, messageCount);
+
+ long time1 = consume(cf1, queue1, messageCount);
+ long time2 = consume(cf2, queue2, messageCount);
+
+ log.info("BlockOnAcknowledge=false MessageCount=" + messageCount + " TimeToConsume=" + time1);
+ log.info("BlockOnAcknowledge=true MessageCount=" + messageCount + " TimeToConsume=" + time2);
+
+ Assert.assertTrue(time1 < (time2 / 2));
+
+ }
+
+ private long send(ConnectionFactory connectionFactory, Destination destination, int messageCount) throws JMSException {
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE)) {
+ MessageProducer producer = session.createProducer(destination);
+ Message m = session.createTextMessage("testing123");
+ long start = System.nanoTime();
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(m);
+ }
+ session.commit();
+ long end = System.nanoTime();
+ return end - start;
+ }
+ }
+ }
+
+ private long consume(ConnectionFactory connectionFactory, Destination destination, int messageCount) throws JMSException {
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)) {
+ MessageConsumer consumer = session.createConsumer(destination);
+ long start = System.nanoTime();
+ for (int i = 0; i < messageCount; i++) {
+ Message message = consumer.receive(100);
+ if (message != null) {
+ message.acknowledge();
+ }
+ }
+ long end = System.nanoTime();
+ return end - start;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7b40abea/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
index efea045..39ea0e3 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/message/MessageHeaderTest.java
@@ -1276,6 +1276,10 @@ public class MessageHeaderTest extends MessageHeaderTestBase {
}
@Override
+ public void commit(boolean block) throws ActiveMQException {
+ }
+
+ @Override
public boolean isRollbackOnly() {
return false;