You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/02/23 22:45:22 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5229 - implement ability to pause/resume dispatch of message to all consumers of a queue

Repository: activemq
Updated Branches:
  refs/heads/master 1406d40ac -> 85b9c81a3


https://issues.apache.org/jira/browse/AMQ-5229 - implement ability to pause/resume dispatch of message to all consumers of a queue


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/85b9c81a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/85b9c81a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/85b9c81a

Branch: refs/heads/master
Commit: 85b9c81a3f2431b8272c19acf4e4b1cddeb25c5e
Parents: 1406d40
Author: gtully <ga...@gmail.com>
Authored: Mon Feb 23 21:41:10 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Feb 23 21:41:33 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/jmx/QueueView.java   | 18 ++++++
 .../activemq/broker/jmx/QueueViewMBean.java     | 11 ++++
 .../apache/activemq/broker/region/Queue.java    | 14 ++++
 .../broker/region/QueueDispatchSelector.java    | 19 ++++--
 .../apache/activemq/broker/jmx/MBeanTest.java   | 67 +++++++++++++++++++-
 5 files changed, 123 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
index 76d82a3..076b4fc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
@@ -226,4 +226,22 @@ public class QueueView extends DestinationView implements QueueViewMBean {
         Queue queue = (Queue) destination;
         queue.getMessageGroupOwners().removeAll();
     }
+
+    @Override
+    public void pause() {
+        Queue queue = (Queue) destination;
+        queue.pauseDispatch();
+    }
+
+    @Override
+    public void resume() {
+        Queue queue = (Queue) destination;
+        queue.resumeDispatch();
+    }
+
+    @Override
+    public boolean isPaused() {
+        Queue queue = (Queue) destination;
+        return queue.isDispatchPaused();
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
index 3f99162..27ef61c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
@@ -209,4 +209,15 @@ public interface QueueViewMBean extends DestinationViewMBean {
      */
     @MBeanInfo("emove all the message groups - will rebalance all message groups across consumers")
     void removeAllMessageGroups();
+
+    @MBeanInfo("pause dispatch to consumers")
+    void pause();
+
+    @MBeanInfo("resume dispatch to consumers if paused")
+    void resume();
+
+    @MBeanInfo("Dispatch to consumers is paused")
+    boolean isPaused();
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c0a237f..67b9119 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -147,6 +147,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
 
     private final Object iteratingMutex = new Object();
 
+
+
     class TimeoutMessage implements Delayed {
 
         Message message;
@@ -1649,6 +1651,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
         }
     }
 
+    public void pauseDispatch() {
+        dispatchSelector.pause();
+    }
+
+    public void resumeDispatch() {
+        dispatchSelector.resume();
+    }
+
+    public boolean isDispatchPaused() {
+        return dispatchSelector.isPaused();
+    }
+
     protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
         return new MessageReferenceFilter() {
             @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
index c73d960..56f6076 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
 public class QueueDispatchSelector extends SimpleDispatchSelector {
     private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class);
     private Subscription exclusiveConsumer;
-   
-   
+    private boolean paused;
+
     /**
      * @param destination
      */
@@ -54,11 +54,22 @@ public class QueueDispatchSelector extends SimpleDispatchSelector {
     public boolean canSelect(Subscription subscription,
             MessageReference m) throws Exception {
        
-        boolean result =  super.canDispatch(subscription, m);
+        boolean result = !paused && super.canDispatch(subscription, m);
         if (result && !subscription.isBrowser()) {
             result = exclusiveConsumer == null || exclusiveConsumer == subscription;
         }
         return result;
     }
-    
+
+    public void pause() {
+        paused = true;
+    }
+
+    public void resume() {
+        paused = false;
+    }
+
+    public boolean isPaused() {
+        return paused;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index e2b0c51..95b7ff0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -818,13 +818,13 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         return answer;
     }
 
-    protected void useConnection(Connection connection) throws Exception {
+    protected void useConnection(Connection connection, int numToSend) throws Exception {
         connection.setClientID(clientID);
         connection.start();
         Session session = connection.createSession(transacted, authMode);
         destination = createDestination();
         MessageProducer producer = session.createProducer(destination);
-        for (int i = 0; i < MESSAGE_COUNT; i++) {
+        for (int i = 0; i < numToSend; i++) {
             Message message = session.createTextMessage("Message: " + i);
             message.setIntProperty("counter", i);
             message.setJMSCorrelationID("MyCorrelationID");
@@ -836,6 +836,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         Thread.sleep(1000);
     }
 
+    protected void useConnection(Connection connection) throws Exception {
+        useConnection(connection, MESSAGE_COUNT);
+    }
+
     protected void useConnectionWithBlobMessage(Connection connection) throws Exception {
         connection.setClientID(clientID);
         connection.start();
@@ -1505,4 +1509,63 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
         assertEquals(mbeans.size(), 1);
         sub.close();
     }
+
+    public void testQueuePauseResume() throws Exception {
+        connection = connectionFactory.createConnection();
+        final int numToSend = 20;
+        useConnection(connection, numToSend);
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
+
+        QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+        CompositeData[] compdatalist = queue.browse();
+        int initialQueueSize = compdatalist.length;
+        assertEquals("expected", numToSend, initialQueueSize);
+
+
+        echo("Attempting to consume 5 bytes messages from: " + destination);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        for (int i=0; i<5; i++) {
+            assertNotNull("Message: " + i, consumer.receive(5000));
+        }
+        consumer.close();
+        session.close();
+
+        compdatalist = queue.browse();
+        assertEquals("expected", numToSend -5, compdatalist.length);
+
+        echo("pause");
+        queue.pause();
+
+        assertTrue("queue is paused", queue.isPaused());
+
+        // verify no consume  while paused
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createConsumer(destination);
+        assertNull("cannot get message while paused", consumer.receive(2000));
+        consumer.close();
+        session.close();
+        connection.close();
+
+        // verify send while paused
+        connection = connectionFactory.createConnection();
+        useConnection(connection, numToSend);
+
+        // verify browse
+        compdatalist = queue.browse();
+        assertEquals("expected browse", (2*numToSend)-5, compdatalist.length);
+        assertEquals("expected message count", compdatalist.length, queue.getQueueSize());
+
+        echo("resume");
+        queue.resume();
+
+        assertFalse("queue is not paused", queue.isPaused());
+
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumer = session.createConsumer(destination);
+        for (int i = 0; i < compdatalist.length; i++) {
+            assertNotNull("Message: " + i, consumer.receive(5000));
+        }
+    }
 }