You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/07 18:47:42 UTC

[1/2] activemq-artemis git commit: ARTEMIS-767 consumer with pre-ack flagged as slow

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 1b157addd -> 736886fc1


ARTEMIS-767 consumer with pre-ack flagged as slow


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/59bff3b3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/59bff3b3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/59bff3b3

Branch: refs/heads/master
Commit: 59bff3b36b14fd00c0c06f3a53320a5f9c99f403
Parents: 1b157ad
Author: jbertram <jb...@apache.com>
Authored: Fri Oct 7 11:43:19 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 14:47:21 2016 -0400

----------------------------------------------------------------------
 .../core/server/impl/ServerConsumerImpl.java    |  1 +
 .../integration/client/SlowConsumerTest.java    | 70 ++++++++++++++++++++
 2 files changed, 71 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59bff3b3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 24eacf5..1318ff3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -381,6 +381,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
                // With pre-ack, we ack *before* sending to the client
                ref.getQueue().acknowledge(ref);
+               acks++;
             }
 
             if (message.isLargeMessage() && this.supportLargeMessage) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/59bff3b3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index a2d60d9..3643f77 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Before;
@@ -210,6 +211,75 @@ public class SlowConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testSlowConsumerWithPreAckNotification() throws Exception {
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, true));
+
+      session.createQueue(QUEUE, QUEUE, null, false);
+
+      AddressSettings addressSettings = new AddressSettings().setSlowConsumerCheckPeriod(2).setSlowConsumerThreshold(1).setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+
+      server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
+      server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      final int numMessages = 25;
+
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(createTextMessage(session, "m" + i));
+      }
+
+      SimpleString notifQueue = RandomUtil.randomSimpleString();
+
+      session.createQueue(ActiveMQDefaultConfiguration.getDefaultManagementNotificationAddress(), notifQueue, null, false);
+
+      ClientConsumer notifConsumer = session.createConsumer(notifQueue.toString(), ManagementHelper.HDR_NOTIFICATION_TYPE + "='" + CoreNotificationType.CONSUMER_SLOW + "'");
+
+      final CountDownLatch notifLatch = new CountDownLatch(1);
+
+      notifConsumer.setMessageHandler(new MessageHandler() {
+         @Override
+         public void onMessage(ClientMessage message) {
+            assertEquals(CoreNotificationType.CONSUMER_SLOW.toString(), message.getObjectProperty(ManagementHelper.HDR_NOTIFICATION_TYPE).toString());
+            IntegrationTestLogger.LOGGER.info("Slow consumer detected!");
+            assertEquals(QUEUE.toString(), message.getObjectProperty(ManagementHelper.HDR_ADDRESS).toString());
+            assertEquals(Integer.valueOf(1), message.getIntProperty(ManagementHelper.HDR_CONSUMER_COUNT));
+            if (isNetty) {
+               assertTrue(message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS).toString().startsWith("/127.0.0.1"));
+            } else {
+               assertEquals(SimpleString.toSimpleString("invm:0"), message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
+            }
+            assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
+            assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_CONSUMER_NAME));
+            assertNotNull(message.getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME));
+            try {
+               message.acknowledge();
+            } catch (ActiveMQException e) {
+               e.printStackTrace();
+            }
+            notifLatch.countDown();
+         }
+      });
+
+      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+      session.start();
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage msg = consumer.receive(1000);
+         assertNotNull(msg);
+         IntegrationTestLogger.LOGGER.info("Received message.");
+         msg.acknowledge();
+         session.commit();
+         Thread.sleep(100);
+      }
+
+      assertFalse(notifLatch.await(3, TimeUnit.SECONDS));
+   }
+
+   @Test
    public void testSlowConsumerSpared() throws Exception {
       ClientSessionFactory sf = createSessionFactory(locator);
 


[2/2] activemq-artemis git commit: This closes #825

Posted by cl...@apache.org.
This closes #825


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/736886fc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/736886fc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/736886fc

Branch: refs/heads/master
Commit: 736886fc131c5f384478fcf57b29e78053ed5321
Parents: 1b157ad 59bff3b
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Oct 7 14:47:22 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 7 14:47:22 2016 -0400

----------------------------------------------------------------------
 .../core/server/impl/ServerConsumerImpl.java    |  1 +
 .../integration/client/SlowConsumerTest.java    | 70 ++++++++++++++++++++
 2 files changed, 71 insertions(+)
----------------------------------------------------------------------