You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 18:47:42 UTC
[1/2] activemq-artemis git commit: ARTEMIS-767 consumer with pre-ack
flagged as slow
Repository: activemq-artemis
Updated Branches:
refs/heads/master 1b157addd -> 736886fc1
ARTEMIS-767 consumer with pre-ack flagged as slow
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/59bff3b3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/59bff3b3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/59bff3b3
Branch: refs/heads/master
Commit: 59bff3b36b14fd00c0c06f3a53320a5f9c99f403
Parents: 1b157ad
Author: jbertram <jb...@apache.com>
Authored: Fri Oct 7 11:43:19 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 14:47:21 2016 -0400
----------------------------------------------------------------------
.../core/server/impl/ServerConsumerImpl.java | 1 +
.../integration/client/SlowConsumerTest.java | 70 ++++++++++++++++++++
2 files changed, 71 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59bff3b3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 24eacf5..1318ff3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -381,6 +381,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// With pre-ack, we ack *before* sending to the client
ref.getQueue().acknowledge(ref);
+ acks++;
}
if (message.isLargeMessage() && this.supportLargeMessage) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59bff3b3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index a2d60d9..3643f77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Before;
@@ -210,6 +211,75 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
@Test
+ public void testSlowConsumerWithPreAckNotification() throws Exception {
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = addClientSession(sf.createSession(false, true, true, true));
+
+ session.createQueue(QUEUE, QUEUE, null, false);
+
+ AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+
+ server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
+ server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
+
+ ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+ final int numMessages = 25;
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(createTextMessage(session, "m" + i));
+ }
+
+ SimpleString notifQueue = RandomUtil.randomSimpleString();
+
+ session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false);
+
+ ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");
+
+ final CountDownLatch notifLatch = new CountDownLatch(1);
+
+ notifConsumer.setMessageHandler(new MessageHandler() {
+ @Override
+ public void onMessage(ClientMessage message) {
+ assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+ IntegrationTestLogger.LOGGER.info("Slow consumer detected!");
+ assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
+ assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
+ if (isNetty) {
+ assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
+ } else {
+ assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
+ }
+ assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
+ assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
+ assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
+ try {
+ message.acknowledge();
+ } catch (ActiveMQException e) {
+ e.printStackTrace();
+ }
+ notifLatch.countDown();
+ }
+ });
+
+ ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+ session.start();
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage msg = consumer.receive(1000);
+ assertNotNull(msg);
+ IntegrationTestLogger.LOGGER.info("Received message.");
+ msg.acknowledge();
+ session.commit();
+ Thread.sleep(100);
+ }
+
+ assertFalse(notifLatch.await(3, TimeUnit.SECONDS));
+ }
+
+ @Test
public void testSlowConsumerSpared() throws Exception {
ClientSessionFactory sf = createSessionFactory(locator);
[2/2] activemq-artemis git commit: This closes #825
Posted by cl...@apache.org.
This closes #825
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/736886fc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/736886fc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/736886fc
Branch: refs/heads/master
Commit: 736886fc131c5f384478fcf57b29e78053ed5321
Parents: 1b157ad 59bff3b
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 14:47:22 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 14:47:22 2016 -0400
----------------------------------------------------------------------
.../core/server/impl/ServerConsumerImpl.java | 1 +
.../integration/client/SlowConsumerTest.java | 70 ++++++++++++++++++++
2 files changed, 71 insertions(+)
----------------------------------------------------------------------